This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch fix/sort-merge-reservation-starvation in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit da5d59f66d29404d70e643559f24e14e0b862c02 Author: xudong.w <[email protected]> AuthorDate: Fri Feb 13 14:39:37 2026 +0800 Cherry pick add metrics for parquet sink from upstream (#32) Cherry-pick of https://github.com/apache/datafusion/pull/20307 Co-Authored-By: Claude Opus 4.6 <[email protected]> --- datafusion/core/src/dataframe/parquet.rs | 143 +++++++++++++++++++++++ datafusion/datasource-parquet/src/file_format.rs | 32 ++++- datafusion/sqllogictest/test_files/copy.slt | 11 ++ 3 files changed, 183 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index cb8a6cf295..c746dbb1e5 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -324,4 +324,147 @@ mod tests { Ok(()) } + + /// Test that ParquetSink exposes rows_written, bytes_written, and + /// elapsed_compute metrics via DataSinkExec. + #[tokio::test] + async fn test_parquet_sink_metrics() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_execution::TaskContext; + + use futures::TryStreamExt; + + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("metrics_test.parquet"); + let output_path_str = output_path.to_str().unwrap(); + + // Register a table with 100 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("val", DataType::Int32, false), + ])); + let ids: Vec<i32> = (0..100).collect(); + let vals: Vec<i32> = (100..200).collect(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(vals)), + ], + )?; + ctx.register_batch("source", batch)?; + + // Create the physical plan for COPY TO + let df = ctx + .sql(&format!( + "COPY source TO '{output_path_str}' STORED AS PARQUET" + )) + .await?; + let plan = df.create_physical_plan().await?; + + // Execute the plan + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx)?; + let _batches: Vec<_> = stream.try_collect().await?; + + // Check metrics on the DataSinkExec (top-level plan) + let metrics = plan + .metrics() + .expect("DataSinkExec should return metrics from ParquetSink"); + let aggregated = metrics.aggregate_by_name(); + + // rows_written should be 100 + let rows_written = aggregated + .iter() + .find(|m| m.value().name() == "rows_written") + .expect("should have rows_written metric"); + assert_eq!( + rows_written.value().as_usize(), + 100, + "expected 100 rows written" + ); + + // bytes_written should be > 0 + let bytes_written = aggregated + .iter() + .find(|m| m.value().name() == "bytes_written") + .expect("should have bytes_written metric"); + assert!( + bytes_written.value().as_usize() > 0, + "expected bytes_written > 0, got {}", + bytes_written.value().as_usize() + ); + + // elapsed_compute should be > 0 + let elapsed = aggregated + .iter() + .find(|m| m.value().name() == "elapsed_compute") + .expect("should have elapsed_compute metric"); + assert!( + elapsed.value().as_usize() > 0, + "expected elapsed_compute > 0" + ); + + Ok(()) + } + + /// Test that ParquetSink metrics work with single_file_parallelism enabled. + #[tokio::test] + async fn test_parquet_sink_metrics_parallel() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_execution::TaskContext; + + use futures::TryStreamExt; + + let ctx = SessionContext::new(); + ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true") + .await? + .collect() + .await?; + + let tmp_dir = TempDir::new()?; + let output_path = tmp_dir.path().join("metrics_parallel.parquet"); + let output_path_str = output_path.to_str().unwrap(); + + let schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let ids: Vec<i32> = (0..50).collect(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(ids))], + )?; + ctx.register_batch("source2", batch)?; + + let df = ctx + .sql(&format!( + "COPY source2 TO '{output_path_str}' STORED AS PARQUET" + )) + .await?; + let plan = df.create_physical_plan().await?; + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx)?; + let _batches: Vec<_> = stream.try_collect().await?; + + let metrics = plan.metrics().expect("DataSinkExec should return metrics"); + let aggregated = metrics.aggregate_by_name(); + + let rows_written = aggregated + .iter() + .find(|m| m.value().name() == "rows_written") + .expect("should have rows_written metric"); + assert_eq!(rows_written.value().as_usize(), 50); + + let bytes_written = aggregated + .iter() + .find(|m| m.value().name() == "bytes_written") + .expect("should have bytes_written metric"); + assert!(bytes_written.value().as_usize() > 0); + + Ok(()) + } } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index f27bda387f..e26607e319 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -54,6 +54,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReserv use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use datafusion_physical_plan::metrics::{ + ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; @@ -1079,6 +1082,8 @@ pub struct ParquetSink { /// File metadata from successfully produced parquet files. The Mutex is only used /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all. written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>, + /// Metrics for tracking write operations + metrics: ExecutionPlanMetricsSet, } impl Debug for ParquetSink { @@ -1110,6 +1115,7 @@ impl ParquetSink { config, parquet_options, written: Default::default(), + metrics: ExecutionPlanMetricsSet::new(), } } @@ -1240,6 +1246,14 @@ impl FileSink for ParquetSink { mut file_stream_rx: DemuxedStreamReceiver, object_store: Arc<dyn ObjectStore>, ) -> Result<u64> { + let rows_written_counter = + MetricBuilder::new(&self.metrics).global_counter("rows_written"); + let bytes_written_counter = + MetricBuilder::new(&self.metrics).global_counter("bytes_written"); + let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0); + + let write_start = datafusion_common::instant::Instant::now(); + let parquet_opts = &self.parquet_options; let mut file_write_tasks: JoinSet< @@ -1317,12 +1331,18 @@ impl FileSink for ParquetSink { } } - let mut row_count = 0; while let Some(result) = file_write_tasks.join_next().await { match result { Ok(r) => { let (path, parquet_meta_data) = r?; - row_count += parquet_meta_data.file_metadata().num_rows(); + let file_rows = parquet_meta_data.file_metadata().num_rows() as usize; + let file_bytes: usize = parquet_meta_data + .row_groups() + .iter() + .map(|rg| rg.compressed_size() as usize) + .sum(); + rows_written_counter.add(file_rows); + bytes_written_counter.add(file_bytes); let mut written_files = self.written.lock(); written_files .try_insert(path.clone(), parquet_meta_data) @@ -1344,7 +1364,9 @@ impl FileSink for ParquetSink { .await .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; - Ok(row_count as u64) + elapsed_compute.add_elapsed(write_start); + + Ok(rows_written_counter.value() as u64) } } @@ -1354,6 +1376,10 @@ impl DataSink for ParquetSink { self } + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } + fn schema(&self) -> &SchemaRef { self.config.output_schema() } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 9af0dc6393..b050acd4b3 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -200,6 +200,17 @@ physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) 02)--DataSourceExec: partitions=1, partition_sizes=[1] +# Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute metrics +# Use a query with Sort and Projection to verify metrics across all operators +query TT +EXPLAIN ANALYZE COPY (SELECT col1, upper(col2) AS col2_upper FROM source_table ORDER BY col1) TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET; +---- +Plan with Metrics +01)DataSinkExec: sink=ParquetSink(file_groups=[]), metrics=[elapsed_compute=<slt:ignore>, bytes_written=<slt:ignore>, rows_written=2] +02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batches_split=0] +03)----ProjectionExec: expr=[col1@0 as col1, upper(col2@1) as col2_upper], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>] +04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + # Copy to directory as partitioned files with keep_partition_by_columns enabled query I COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
