This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 72e39b8bce Add metrics for UnnestExec (#8482)
72e39b8bce is described below
commit 72e39b8bce867d1f141356a918400b185e6efe74
Author: Simon Vandel Sillesen <[email protected]>
AuthorDate: Thu Dec 14 23:28:25 2023 +0300
Add metrics for UnnestExec (#8482)
---
datafusion/core/tests/dataframe/mod.rs | 24 ++++++++-
datafusion/physical-plan/src/unnest.rs | 92 +++++++++++++++++++++++-----------
2 files changed, 86 insertions(+), 30 deletions(-)
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index c6b8e0e01b..ba661aa244 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -39,7 +39,7 @@ use datafusion::prelude::JoinType;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion::test_util::parquet_test_data;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
-use datafusion_common::{DataFusionError, ScalarValue, UnnestOptions};
+use datafusion_common::{assert_contains, DataFusionError, ScalarValue,
UnnestOptions};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::{
@@ -1408,6 +1408,28 @@ async fn unnest_with_redundant_columns() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn unnest_analyze_metrics() -> Result<()> {
+ const NUM_ROWS: usize = 5;
+
+ let df = table_with_nested_types(NUM_ROWS).await?;
+ let results = df
+ .unnest_column("tags")?
+ .explain(false, true)?
+ .collect()
+ .await?;
+ let formatted = arrow::util::pretty::pretty_format_batches(&results)
+ .unwrap()
+ .to_string();
+ assert_contains!(&formatted, "elapsed_compute=");
+ assert_contains!(&formatted, "input_batches=1");
+ assert_contains!(&formatted, "input_rows=5");
+ assert_contains!(&formatted, "output_rows=10");
+ assert_contains!(&formatted, "output_batches=1");
+
+ Ok(())
+}
+
async fn create_test_table(name: &str) -> Result<DataFrame> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
diff --git a/datafusion/physical-plan/src/unnest.rs
b/datafusion/physical-plan/src/unnest.rs
index af4a81626c..b9e732c317 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -17,8 +17,6 @@
//! Defines the unnest column plan for unnesting values in a column that
contains a list
//! type, conceptually is like joining each row with all the values in the
list column.
-
-use std::time::Instant;
use std::{any::Any, sync::Arc};
use super::DisplayAs;
@@ -44,6 +42,8 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use log::trace;
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
+
/// Unnest the given column by joining the row with each value in the
/// nested type.
///
@@ -58,6 +58,8 @@ pub struct UnnestExec {
column: Column,
/// Options
options: UnnestOptions,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl UnnestExec {
@@ -73,6 +75,7 @@ impl UnnestExec {
schema,
column,
options,
+ metrics: Default::default(),
}
}
}
@@ -141,19 +144,58 @@ impl ExecutionPlan for UnnestExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
+ let metrics = UnnestMetrics::new(partition, &self.metrics);
Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
column: self.column.clone(),
options: self.options.clone(),
- num_input_batches: 0,
- num_input_rows: 0,
- num_output_batches: 0,
- num_output_rows: 0,
- unnest_time: 0,
+ metrics,
}))
}
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+}
+
+#[derive(Clone, Debug)]
+struct UnnestMetrics {
+ /// total time for column unnesting
+ elapsed_compute: metrics::Time,
+ /// Number of batches consumed
+ input_batches: metrics::Count,
+ /// Number of rows consumed
+ input_rows: metrics::Count,
+ /// Number of batches produced
+ output_batches: metrics::Count,
+ /// Number of rows produced by this operator
+ output_rows: metrics::Count,
+}
+
+impl UnnestMetrics {
+ fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+ let elapsed_compute =
MetricBuilder::new(metrics).elapsed_compute(partition);
+
+ let input_batches =
+ MetricBuilder::new(metrics).counter("input_batches", partition);
+
+ let input_rows = MetricBuilder::new(metrics).counter("input_rows",
partition);
+
+ let output_batches =
+ MetricBuilder::new(metrics).counter("output_batches", partition);
+
+ let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
+ Self {
+ input_batches,
+ input_rows,
+ output_batches,
+ output_rows,
+ elapsed_compute,
+ }
+ }
}
/// A stream that issues [RecordBatch]es with unnested column data.
@@ -166,16 +208,8 @@ struct UnnestStream {
column: Column,
/// Options
options: UnnestOptions,
- /// number of input batches
- num_input_batches: usize,
- /// number of input rows
- num_input_rows: usize,
- /// number of batches produced
- num_output_batches: usize,
- /// number of rows produced
- num_output_rows: usize,
- /// total time for column unnesting, in ms
- unnest_time: usize,
+ /// Metrics
+ metrics: UnnestMetrics,
}
impl RecordBatchStream for UnnestStream {
@@ -207,15 +241,15 @@ impl UnnestStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
- let start = Instant::now();
+ let timer = self.metrics.elapsed_compute.timer();
let result =
build_batch(&batch, &self.schema, &self.column,
&self.options);
- self.num_input_batches += 1;
- self.num_input_rows += batch.num_rows();
+ self.metrics.input_batches.add(1);
+ self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
- self.unnest_time += start.elapsed().as_millis() as
usize;
- self.num_output_batches += 1;
- self.num_output_rows += batch.num_rows();
+ timer.done();
+ self.metrics.output_batches.add(1);
+ self.metrics.output_rows.add(batch.num_rows());
}
Some(result)
@@ -223,12 +257,12 @@ impl UnnestStream {
other => {
trace!(
"Processed {} probe-side input batches containing {}
rows and \
- produced {} output batches containing {} rows in {}
ms",
- self.num_input_batches,
- self.num_input_rows,
- self.num_output_batches,
- self.num_output_rows,
- self.unnest_time,
+ produced {} output batches containing {} rows in {}",
+ self.metrics.input_batches,
+ self.metrics.input_rows,
+ self.metrics.output_batches,
+ self.metrics.output_rows,
+ self.metrics.elapsed_compute,
);
other
}