This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c8241a4dc feat(small): Add `BaselineMetrics` to `generate_series()` 
table function (#16255)
2c8241a4dc is described below

commit 2c8241a4dcb082191e89506c3cd58be79918d0a1
Author: Yongting You <2010you...@gmail.com>
AuthorDate: Fri Jun 6 04:42:28 2025 +0800

    feat(small): Add `BaselineMetrics` to `generate_series()` table function 
(#16255)
    
    * Add BaselineMetrics to LazyMemoryStream
    
    * UT
---
 datafusion/physical-plan/src/memory.rs           | 61 +++++++++++++++++++++++-
 datafusion/physical-plan/src/metrics/baseline.rs |  7 +--
 2 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index 1bc872a56e..c232970b21 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::execution_plan::{Boundedness, EmissionType};
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use crate::{
     DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
     RecordBatchStream, SendableRecordBatchStream, Statistics,
@@ -146,6 +147,8 @@ pub struct LazyMemoryExec {
     batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
     /// Plan properties cache storing equivalence properties, partitioning, 
and execution mode
     cache: PlanProperties,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl LazyMemoryExec {
@@ -164,6 +167,7 @@ impl LazyMemoryExec {
             schema,
             batch_generators: generators,
             cache,
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 }
@@ -254,12 +258,18 @@ impl ExecutionPlan for LazyMemoryExec {
             );
         }
 
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         Ok(Box::pin(LazyMemoryStream {
             schema: Arc::clone(&self.schema),
             generator: Arc::clone(&self.batch_generators[partition]),
+            baseline_metrics,
         }))
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn statistics(&self) -> Result<Statistics> {
         Ok(Statistics::new_unknown(&self.schema))
     }
@@ -276,6 +286,8 @@ pub struct LazyMemoryStream {
     /// parallel execution.
     /// Sharing generators between streams should be used with caution.
     generator: Arc<RwLock<dyn LazyBatchGenerator>>,
+    /// Execution metrics
+    baseline_metrics: BaselineMetrics,
 }
 
 impl Stream for LazyMemoryStream {
@@ -285,13 +297,16 @@ impl Stream for LazyMemoryStream {
         self: std::pin::Pin<&mut Self>,
         _: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        let _timer_guard = self.baseline_metrics.elapsed_compute().timer();
         let batch = self.generator.write().generate_next_batch();
 
-        match batch {
+        let poll = match batch {
             Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
             Ok(None) => Poll::Ready(None),
             Err(e) => Poll::Ready(Some(Err(e))),
-        }
+        };
+
+        self.baseline_metrics.record_poll(poll)
     }
 }
 
@@ -304,6 +319,7 @@ impl RecordBatchStream for LazyMemoryStream {
 #[cfg(test)]
 mod lazy_memory_tests {
     use super::*;
+    use crate::common::collect;
     use arrow::array::Int64Array;
     use arrow::datatypes::{DataType, Field, Schema};
     use futures::StreamExt;
@@ -419,4 +435,45 @@ mod lazy_memory_tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_generate_series_metrics_integration() -> Result<()> {
+        // Test LazyMemoryExec metrics with different configurations
+        let test_cases = vec![
+            (10, 2, 10),    // 10 rows, batch size 2, expected 10 rows
+            (100, 10, 100), // 100 rows, batch size 10, expected 100 rows
+            (5, 1, 5),      // 5 rows, batch size 1, expected 5 rows
+        ];
+
+        for (total_rows, batch_size, expected_rows) in test_cases {
+            let schema =
+                Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, 
false)]));
+            let generator = TestGenerator {
+                counter: 0,
+                max_batches: (total_rows + batch_size - 1) / batch_size, // 
ceiling division
+                batch_size: batch_size as usize,
+                schema: Arc::clone(&schema),
+            };
+
+            let exec =
+                LazyMemoryExec::try_new(schema, 
vec![Arc::new(RwLock::new(generator))])?;
+            let task_ctx = Arc::new(TaskContext::default());
+
+            let stream = exec.execute(0, task_ctx)?;
+            let batches = collect(stream).await?;
+
+            // Verify metrics exist with actual expected numbers
+            let metrics = exec.metrics().unwrap();
+
+            // Count actual rows returned
+            let actual_rows: usize = batches.iter().map(|b| 
b.num_rows()).sum();
+            assert_eq!(actual_rows, expected_rows);
+
+            // Verify metrics match actual output
+            assert_eq!(metrics.output_rows().unwrap(), expected_rows);
+            assert!(metrics.elapsed_compute().unwrap() > 0);
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/metrics/baseline.rs 
b/datafusion/physical-plan/src/metrics/baseline.rs
index a4a83b84b6..de436d0e4f 100644
--- a/datafusion/physical-plan/src/metrics/baseline.rs
+++ b/datafusion/physical-plan/src/metrics/baseline.rs
@@ -117,9 +117,10 @@ impl BaselineMetrics {
         }
     }
 
-    /// Process a poll result of a stream producing output for an
-    /// operator, recording the output rows and stream done time and
-    /// returning the same poll result
+    /// Process a poll result of a stream producing output for an operator.
+    ///
+    /// Note: this method only updates `output_rows` and `end_time` metrics.
+    /// Remember to update `elapsed_compute` and other metrics manually.
     pub fn record_poll(
         &self,
         poll: Poll<Option<Result<RecordBatch>>>,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to