This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 72e39b8bce Add metrics for UnnestExec (#8482)
72e39b8bce is described below

commit 72e39b8bce867d1f141356a918400b185e6efe74
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Thu Dec 14 23:28:25 2023 +0300

    Add metrics for UnnestExec (#8482)
---
 datafusion/core/tests/dataframe/mod.rs | 24 ++++++++-
 datafusion/physical-plan/src/unnest.rs | 92 +++++++++++++++++++++++-----------
 2 files changed, 86 insertions(+), 30 deletions(-)

diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index c6b8e0e01b..ba661aa244 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -39,7 +39,7 @@ use datafusion::prelude::JoinType;
 use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
 use datafusion::test_util::parquet_test_data;
 use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
-use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
+use datafusion_common::{assert_contains, DataFusionError, ScalarValue, 
UnnestOptions};
 use datafusion_execution::config::SessionConfig;
 use datafusion_expr::expr::{GroupingSet, Sort};
 use datafusion_expr::{
@@ -1408,6 +1408,28 @@ async fn unnest_with_redundant_columns() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn unnest_analyze_metrics() -> Result<()> {
+    const NUM_ROWS: usize = 5;
+
+    let df = table_with_nested_types(NUM_ROWS).await?;
+    let results = df
+        .unnest_column("tags")?
+        .explain(false, true)?
+        .collect()
+        .await?;
+    let formatted = arrow::util::pretty::pretty_format_batches(&results)
+        .unwrap()
+        .to_string();
+    assert_contains!(&formatted, "elapsed_compute=");
+    assert_contains!(&formatted, "input_batches=1");
+    assert_contains!(&formatted, "input_rows=5");
+    assert_contains!(&formatted, "output_rows=10");
+    assert_contains!(&formatted, "output_batches=1");
+
+    Ok(())
+}
+
 async fn create_test_table(name: &str) -> Result<DataFrame> {
     let schema = Arc::new(Schema::new(vec![
         Field::new("a", DataType::Utf8, false),
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index af4a81626c..b9e732c317 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -17,8 +17,6 @@
 
 //! Defines the unnest column plan for unnesting values in a column that 
contains a list
 //! type, conceptually is like joining each row with all the values in the 
list column.
-
-use std::time::Instant;
 use std::{any::Any, sync::Arc};
 
 use super::DisplayAs;
@@ -44,6 +42,8 @@ use async_trait::async_trait;
 use futures::{Stream, StreamExt};
 use log::trace;
 
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
+
 /// Unnest the given column by joining the row with each value in the
 /// nested type.
 ///
@@ -58,6 +58,8 @@ pub struct UnnestExec {
     column: Column,
     /// Options
     options: UnnestOptions,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl UnnestExec {
@@ -73,6 +75,7 @@ impl UnnestExec {
             schema,
             column,
             options,
+            metrics: Default::default(),
         }
     }
 }
@@ -141,19 +144,58 @@ impl ExecutionPlan for UnnestExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let input = self.input.execute(partition, context)?;
+        let metrics = UnnestMetrics::new(partition, &self.metrics);
 
         Ok(Box::pin(UnnestStream {
             input,
             schema: self.schema.clone(),
             column: self.column.clone(),
             options: self.options.clone(),
-            num_input_batches: 0,
-            num_input_rows: 0,
-            num_output_batches: 0,
-            num_output_rows: 0,
-            unnest_time: 0,
+            metrics,
         }))
     }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+}
+
+#[derive(Clone, Debug)]
+struct UnnestMetrics {
+    /// total time for column unnesting
+    elapsed_compute: metrics::Time,
+    /// Number of batches consumed
+    input_batches: metrics::Count,
+    /// Number of rows consumed
+    input_rows: metrics::Count,
+    /// Number of batches produced
+    output_batches: metrics::Count,
+    /// Number of rows produced by this operator
+    output_rows: metrics::Count,
+}
+
+impl UnnestMetrics {
+    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+        let elapsed_compute = 
MetricBuilder::new(metrics).elapsed_compute(partition);
+
+        let input_batches =
+            MetricBuilder::new(metrics).counter("input_batches", partition);
+
+        let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
+
+        let output_batches =
+            MetricBuilder::new(metrics).counter("output_batches", partition);
+
+        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
+        Self {
+            input_batches,
+            input_rows,
+            output_batches,
+            output_rows,
+            elapsed_compute,
+        }
+    }
 }
 
 /// A stream that issues [RecordBatch]es with unnested column data.
@@ -166,16 +208,8 @@ struct UnnestStream {
     column: Column,
     /// Options
     options: UnnestOptions,
-    /// number of input batches
-    num_input_batches: usize,
-    /// number of input rows
-    num_input_rows: usize,
-    /// number of batches produced
-    num_output_batches: usize,
-    /// number of rows produced
-    num_output_rows: usize,
-    /// total time for column unnesting, in ms
-    unnest_time: usize,
+    /// Metrics
+    metrics: UnnestMetrics,
 }
 
 impl RecordBatchStream for UnnestStream {
@@ -207,15 +241,15 @@ impl UnnestStream {
             .poll_next_unpin(cx)
             .map(|maybe_batch| match maybe_batch {
                 Some(Ok(batch)) => {
-                    let start = Instant::now();
+                    let timer = self.metrics.elapsed_compute.timer();
                     let result =
                         build_batch(&batch, &self.schema, &self.column, 
&self.options);
-                    self.num_input_batches += 1;
-                    self.num_input_rows += batch.num_rows();
+                    self.metrics.input_batches.add(1);
+                    self.metrics.input_rows.add(batch.num_rows());
                     if let Ok(ref batch) = result {
-                        self.unnest_time += start.elapsed().as_millis() as 
usize;
-                        self.num_output_batches += 1;
-                        self.num_output_rows += batch.num_rows();
+                        timer.done();
+                        self.metrics.output_batches.add(1);
+                        self.metrics.output_rows.add(batch.num_rows());
                     }
 
                     Some(result)
@@ -223,12 +257,12 @@ impl UnnestStream {
                 other => {
                     trace!(
                         "Processed {} probe-side input batches containing {} 
rows and \
-                        produced {} output batches containing {} rows in {} 
ms",
-                        self.num_input_batches,
-                        self.num_input_rows,
-                        self.num_output_batches,
-                        self.num_output_rows,
-                        self.unnest_time,
+                        produced {} output batches containing {} rows in {}",
+                        self.metrics.input_batches,
+                        self.metrics.input_rows,
+                        self.metrics.output_batches,
+                        self.metrics.output_rows,
+                        self.metrics.elapsed_compute,
                     );
                     other
                 }

Reply via email to