This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5400d71607 Reuse `BaselineMetrics` in `UnnestMetrics` (#16497)
5400d71607 is described below

commit 5400d71607424f15c49af0d9ee2ab006776a31f5
Author: Hendrik Makait <hend...@makait.com>
AuthorDate: Tue Jun 24 21:40:09 2025 +0200

    Reuse `BaselineMetrics` in `UnnestMetrics` (#16497)
    
    * Use BaselineMetrics in UnnestMetrics
    
    * fmt
    
    * remove comment
    
    * Avoid leak into poll
    
    * fmt
---
 datafusion/physical-plan/src/metrics/baseline.rs |  2 +-
 datafusion/physical-plan/src/unnest.rs           | 31 +++++++++++-------------
 2 files changed, 15 insertions(+), 18 deletions(-)

diff --git a/datafusion/physical-plan/src/metrics/baseline.rs 
b/datafusion/physical-plan/src/metrics/baseline.rs
index de436d0e4f..b57652d4b6 100644
--- a/datafusion/physical-plan/src/metrics/baseline.rs
+++ b/datafusion/physical-plan/src/metrics/baseline.rs
@@ -45,7 +45,7 @@ use datafusion_common::Result;
 /// ```
 #[derive(Debug, Clone)]
 pub struct BaselineMetrics {
-    /// end_time is set when `ExecutionMetrics::done()` is called
+    /// end_time is set when `BaselineMetrics::done()` is called
     end_time: Timestamp,
 
     /// amount of time the operator was actively trying to use the CPU
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index c06b09f2fe..e36cd2b6c2 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -21,7 +21,10 @@ use std::cmp::{self, Ordering};
 use std::task::{ready, Poll};
 use std::{any::Any, sync::Arc};
 
-use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
+use super::metrics::{
+    self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+    RecordOutput,
+};
 use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
 use crate::{
     DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
@@ -38,13 +41,12 @@ use arrow::compute::{cast, is_not_null, kernels, sum};
 use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow_ord::cmp::lt;
+use async_trait::async_trait;
 use datafusion_common::{
     exec_datafusion_err, exec_err, internal_err, HashMap, HashSet, Result, 
UnnestOptions,
 };
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::EquivalenceProperties;
-
-use async_trait::async_trait;
 use futures::{Stream, StreamExt};
 use log::trace;
 
@@ -203,22 +205,18 @@ impl ExecutionPlan for UnnestExec {
 
 #[derive(Clone, Debug)]
 struct UnnestMetrics {
-    /// Total time for column unnesting
-    elapsed_compute: metrics::Time,
+    /// Execution metrics
+    baseline_metrics: BaselineMetrics,
     /// Number of batches consumed
     input_batches: metrics::Count,
     /// Number of rows consumed
     input_rows: metrics::Count,
     /// Number of batches produced
     output_batches: metrics::Count,
-    /// Number of rows produced by this operator
-    output_rows: metrics::Count,
 }
 
 impl UnnestMetrics {
     fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
-        let elapsed_compute = 
MetricBuilder::new(metrics).elapsed_compute(partition);
-
         let input_batches =
             MetricBuilder::new(metrics).counter("input_batches", partition);
 
@@ -227,14 +225,11 @@ impl UnnestMetrics {
         let output_batches =
             MetricBuilder::new(metrics).counter("output_batches", partition);
 
-        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
-
         Self {
+            baseline_metrics: BaselineMetrics::new(metrics, partition),
             input_batches,
             input_rows,
             output_batches,
-            output_rows,
-            elapsed_compute,
         }
     }
 }
@@ -284,7 +279,9 @@ impl UnnestStream {
         loop {
             return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
                 Some(Ok(batch)) => {
-                    let timer = self.metrics.elapsed_compute.timer();
+                    let elapsed_compute =
+                        
self.metrics.baseline_metrics.elapsed_compute().clone();
+                    let timer = elapsed_compute.timer();
                     self.metrics.input_batches.add(1);
                     self.metrics.input_rows.add(batch.num_rows());
                     let result = build_batch(
@@ -299,7 +296,7 @@ impl UnnestStream {
                         continue;
                     };
                     self.metrics.output_batches.add(1);
-                    self.metrics.output_rows.add(result_batch.num_rows());
+                    
(&result_batch).record_output(&self.metrics.baseline_metrics);
 
                     // Empty record batches should not be emitted.
                     // They need to be treated as  [`Option<RecordBatch>`]es 
and handled separately
@@ -313,8 +310,8 @@ impl UnnestStream {
                         self.metrics.input_batches,
                         self.metrics.input_rows,
                         self.metrics.output_batches,
-                        self.metrics.output_rows,
-                        self.metrics.elapsed_compute,
+                        self.metrics.baseline_metrics.output_rows(),
+                        self.metrics.baseline_metrics.elapsed_compute(),
                     );
                     other
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to