This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 2c8241a4dc feat(small): Add `BaselineMetrics` to `generate_series()` table function (#16255) 2c8241a4dc is described below commit 2c8241a4dcb082191e89506c3cd58be79918d0a1 Author: Yongting You <2010you...@gmail.com> AuthorDate: Fri Jun 6 04:42:28 2025 +0800 feat(small): Add `BaselineMetrics` to `generate_series()` table function (#16255) * Add BaselineMetrics to LazyMemoryStream * UT --- datafusion/physical-plan/src/memory.rs | 61 +++++++++++++++++++++++- datafusion/physical-plan/src/metrics/baseline.rs | 7 +-- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 1bc872a56e..c232970b21 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::execution_plan::{Boundedness, EmissionType}; +use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -146,6 +147,8 @@ pub struct LazyMemoryExec { batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode cache: PlanProperties, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, } impl LazyMemoryExec { @@ -164,6 +167,7 @@ impl LazyMemoryExec { schema, batch_generators: generators, cache, + metrics: ExecutionPlanMetricsSet::new(), }) } } @@ -254,12 +258,18 @@ impl ExecutionPlan for LazyMemoryExec { ); } + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(LazyMemoryStream { schema: Arc::clone(&self.schema), generator: Arc::clone(&self.batch_generators[partition]), + baseline_metrics, })) } + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Result<Statistics> { Ok(Statistics::new_unknown(&self.schema)) } @@ -276,6 +286,8 @@ pub struct LazyMemoryStream { /// parallel execution. /// Sharing generators between streams should be used with caution. generator: Arc<RwLock<dyn LazyBatchGenerator>>, + /// Execution metrics + baseline_metrics: BaselineMetrics, } impl Stream for LazyMemoryStream { @@ -285,13 +297,16 @@ impl Stream for LazyMemoryStream { self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { + let _timer_guard = self.baseline_metrics.elapsed_compute().timer(); let batch = self.generator.write().generate_next_batch(); - match batch { + let poll = match batch { Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))), Ok(None) => Poll::Ready(None), Err(e) => Poll::Ready(Some(Err(e))), - } + }; + + self.baseline_metrics.record_poll(poll) } } @@ -304,6 +319,7 @@ impl RecordBatchStream for LazyMemoryStream { #[cfg(test)] mod lazy_memory_tests { use super::*; + use crate::common::collect; use arrow::array::Int64Array; use arrow::datatypes::{DataType, Field, Schema}; use futures::StreamExt; @@ -419,4 +435,45 @@ mod lazy_memory_tests { Ok(()) } + + #[tokio::test] + async fn test_generate_series_metrics_integration() -> Result<()> { + // Test LazyMemoryExec metrics with different configurations + let test_cases = vec![ + (10, 2, 10), // 10 rows, batch size 2, expected 10 rows + (100, 10, 100), // 100 rows, batch size 10, expected 100 rows + (5, 1, 5), // 5 rows, batch size 1, expected 5 rows + ]; + + for (total_rows, batch_size, expected_rows) in test_cases { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let generator = TestGenerator { + counter: 0, + max_batches: (total_rows + batch_size - 1) / batch_size, // ceiling division + batch_size: batch_size as usize, + schema: Arc::clone(&schema), + }; + + let exec = + LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?; + let task_ctx = Arc::new(TaskContext::default()); + + let stream = exec.execute(0, task_ctx)?; + let batches = collect(stream).await?; + + // Verify metrics exist with actual expected numbers + let metrics = exec.metrics().unwrap(); + + // Count actual rows returned + let actual_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(actual_rows, expected_rows); + + // Verify metrics match actual output + assert_eq!(metrics.output_rows().unwrap(), expected_rows); + assert!(metrics.elapsed_compute().unwrap() > 0); + } + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index a4a83b84b6..de436d0e4f 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -117,9 +117,10 @@ impl BaselineMetrics { } } - /// Process a poll result of a stream producing output for an - /// operator, recording the output rows and stream done time and - /// returning the same poll result + /// Process a poll result of a stream producing output for an operator. + /// + /// Note: this method only updates `output_rows` and `end_time` metrics. + /// Remember to update `elapsed_compute` and other metrics manually. pub fn record_poll( &self, poll: Poll<Option<Result<RecordBatch>>>, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org