This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 5400d71607 Reuse `BaselineMetrics` in `UnnestMetrics` (#16497) 5400d71607 is described below commit 5400d71607424f15c49af0d9ee2ab006776a31f5 Author: Hendrik Makait <hend...@makait.com> AuthorDate: Tue Jun 24 21:40:09 2025 +0200 Reuse `BaselineMetrics` in `UnnestMetrics` (#16497) * Use BaselineMetrics in UnnestMetrics * fmt * remove comment * Avoid leak into poll * fmt --- datafusion/physical-plan/src/metrics/baseline.rs | 2 +- datafusion/physical-plan/src/unnest.rs | 31 +++++++++++------------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index de436d0e4f..b57652d4b6 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -45,7 +45,7 @@ use datafusion_common::Result; /// ``` #[derive(Debug, Clone)] pub struct BaselineMetrics { - /// end_time is set when `ExecutionMetrics::done()` is called + /// end_time is set when `BaselineMetrics::done()` is called end_time: Timestamp, /// amount of time the operator was actively trying to use the CPU diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index c06b09f2fe..e36cd2b6c2 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -21,7 +21,10 @@ use std::cmp::{self, Ordering}; use std::task::{ready, Poll}; use std::{any::Any, sync::Arc}; -use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; +use super::metrics::{ + self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, + RecordOutput, +}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, @@ -38,13 +41,12 @@ use arrow::compute::{cast, is_not_null, kernels, sum}; use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_ord::cmp::lt; +use async_trait::async_trait; use datafusion_common::{ exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, UnnestOptions, }; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; - -use async_trait::async_trait; use futures::{Stream, StreamExt}; use log::trace; @@ -203,22 +205,18 @@ impl ExecutionPlan for UnnestExec { #[derive(Clone, Debug)] struct UnnestMetrics { - /// Total time for column unnesting - elapsed_compute: metrics::Time, + /// Execution metrics + baseline_metrics: BaselineMetrics, /// Number of batches consumed input_batches: metrics::Count, /// Number of rows consumed input_rows: metrics::Count, /// Number of batches produced output_batches: metrics::Count, - /// Number of rows produced by this operator - output_rows: metrics::Count, } impl UnnestMetrics { fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { - let elapsed_compute = MetricBuilder::new(metrics).elapsed_compute(partition); - let input_batches = MetricBuilder::new(metrics).counter("input_batches", partition); @@ -227,14 +225,11 @@ impl UnnestMetrics { let output_batches = MetricBuilder::new(metrics).counter("output_batches", partition); - let output_rows = MetricBuilder::new(metrics).output_rows(partition); - Self { + baseline_metrics: BaselineMetrics::new(metrics, partition), input_batches, input_rows, output_batches, - output_rows, - elapsed_compute, } } } @@ -284,7 +279,9 @@ impl UnnestStream { loop { return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - let timer = self.metrics.elapsed_compute.timer(); + let elapsed_compute = + self.metrics.baseline_metrics.elapsed_compute().clone(); + let timer = elapsed_compute.timer(); self.metrics.input_batches.add(1); self.metrics.input_rows.add(batch.num_rows()); let result = build_batch( @@ -299,7 +296,7 @@ impl UnnestStream { continue; }; self.metrics.output_batches.add(1); - self.metrics.output_rows.add(result_batch.num_rows()); + (&result_batch).record_output(&self.metrics.baseline_metrics); // Empty record batches should not be emitted. // They need to be treated as [`Option<RecordBatch>`]es and handled separately @@ -313,8 +310,8 @@ impl UnnestStream { self.metrics.input_batches, self.metrics.input_rows, self.metrics.output_batches, - self.metrics.output_rows, - self.metrics.elapsed_compute, + self.metrics.baseline_metrics.output_rows(), + self.metrics.baseline_metrics.elapsed_compute(), ); other } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org