This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch fix/sort-merge-reservation-starvation
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit da5d59f66d29404d70e643559f24e14e0b862c02
Author: xudong.w <[email protected]>
AuthorDate: Fri Feb 13 14:39:37 2026 +0800

    Cherry pick add metrics for parquet sink from upstream (#32)
    
    Cherry-pick of https://github.com/apache/datafusion/pull/20307
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 datafusion/core/src/dataframe/parquet.rs         | 143 +++++++++++++++++++++++
 datafusion/datasource-parquet/src/file_format.rs |  32 ++++-
 datafusion/sqllogictest/test_files/copy.slt      |  11 ++
 3 files changed, 183 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/dataframe/parquet.rs 
b/datafusion/core/src/dataframe/parquet.rs
index cb8a6cf295..c746dbb1e5 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -324,4 +324,147 @@ mod tests {
 
         Ok(())
     }
+
+    /// Test that ParquetSink exposes rows_written, bytes_written, and
+    /// elapsed_compute metrics via DataSinkExec.
+    #[tokio::test]
+    async fn test_parquet_sink_metrics() -> Result<()> {
+        use arrow::array::Int32Array;
+        use arrow::datatypes::{DataType, Field, Schema};
+        use arrow::record_batch::RecordBatch;
+        use datafusion_execution::TaskContext;
+
+        use futures::TryStreamExt;
+
+        let ctx = SessionContext::new();
+        let tmp_dir = TempDir::new()?;
+        let output_path = tmp_dir.path().join("metrics_test.parquet");
+        let output_path_str = output_path.to_str().unwrap();
+
+        // Register a table with 100 rows
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("val", DataType::Int32, false),
+        ]));
+        let ids: Vec<i32> = (0..100).collect();
+        let vals: Vec<i32> = (100..200).collect();
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(Int32Array::from(vals)),
+            ],
+        )?;
+        ctx.register_batch("source", batch)?;
+
+        // Create the physical plan for COPY TO
+        let df = ctx
+            .sql(&format!(
+                "COPY source TO '{output_path_str}' STORED AS PARQUET"
+            ))
+            .await?;
+        let plan = df.create_physical_plan().await?;
+
+        // Execute the plan
+        let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
+        let stream = plan.execute(0, task_ctx)?;
+        let _batches: Vec<_> = stream.try_collect().await?;
+
+        // Check metrics on the DataSinkExec (top-level plan)
+        let metrics = plan
+            .metrics()
+            .expect("DataSinkExec should return metrics from ParquetSink");
+        let aggregated = metrics.aggregate_by_name();
+
+        // rows_written should be 100
+        let rows_written = aggregated
+            .iter()
+            .find(|m| m.value().name() == "rows_written")
+            .expect("should have rows_written metric");
+        assert_eq!(
+            rows_written.value().as_usize(),
+            100,
+            "expected 100 rows written"
+        );
+
+        // bytes_written should be > 0
+        let bytes_written = aggregated
+            .iter()
+            .find(|m| m.value().name() == "bytes_written")
+            .expect("should have bytes_written metric");
+        assert!(
+            bytes_written.value().as_usize() > 0,
+            "expected bytes_written > 0, got {}",
+            bytes_written.value().as_usize()
+        );
+
+        // elapsed_compute should be > 0
+        let elapsed = aggregated
+            .iter()
+            .find(|m| m.value().name() == "elapsed_compute")
+            .expect("should have elapsed_compute metric");
+        assert!(
+            elapsed.value().as_usize() > 0,
+            "expected elapsed_compute > 0"
+        );
+
+        Ok(())
+    }
+
+    /// Test that ParquetSink metrics work with single_file_parallelism 
enabled.
+    #[tokio::test]
+    async fn test_parquet_sink_metrics_parallel() -> Result<()> {
+        use arrow::array::Int32Array;
+        use arrow::datatypes::{DataType, Field, Schema};
+        use arrow::record_batch::RecordBatch;
+        use datafusion_execution::TaskContext;
+
+        use futures::TryStreamExt;
+
+        let ctx = SessionContext::new();
+        ctx.sql("SET 
datafusion.execution.parquet.allow_single_file_parallelism = true")
+            .await?
+            .collect()
+            .await?;
+
+        let tmp_dir = TempDir::new()?;
+        let output_path = tmp_dir.path().join("metrics_parallel.parquet");
+        let output_path_str = output_path.to_str().unwrap();
+
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, 
false)]));
+        let ids: Vec<i32> = (0..50).collect();
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(Int32Array::from(ids))],
+        )?;
+        ctx.register_batch("source2", batch)?;
+
+        let df = ctx
+            .sql(&format!(
+                "COPY source2 TO '{output_path_str}' STORED AS PARQUET"
+            ))
+            .await?;
+        let plan = df.create_physical_plan().await?;
+        let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
+        let stream = plan.execute(0, task_ctx)?;
+        let _batches: Vec<_> = stream.try_collect().await?;
+
+        let metrics = plan.metrics().expect("DataSinkExec should return 
metrics");
+        let aggregated = metrics.aggregate_by_name();
+
+        let rows_written = aggregated
+            .iter()
+            .find(|m| m.value().name() == "rows_written")
+            .expect("should have rows_written metric");
+        assert_eq!(rows_written.value().as_usize(), 50);
+
+        let bytes_written = aggregated
+            .iter()
+            .find(|m| m.value().name() == "bytes_written")
+            .expect("should have bytes_written metric");
+        assert!(bytes_written.value().as_usize() > 0);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index f27bda387f..e26607e319 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -54,6 +54,9 @@ use datafusion_execution::memory_pool::{MemoryConsumer, 
MemoryPool, MemoryReserv
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
+use datafusion_physical_plan::metrics::{
+    ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+};
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
 use datafusion_session::Session;
 
@@ -1079,6 +1082,8 @@ pub struct ParquetSink {
     /// File metadata from successfully produced parquet files. The Mutex is 
only used
     /// to allow inserting to HashMap from behind borrowed reference in 
DataSink::write_all.
     written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
+    /// Metrics for tracking write operations
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl Debug for ParquetSink {
@@ -1110,6 +1115,7 @@ impl ParquetSink {
             config,
             parquet_options,
             written: Default::default(),
+            metrics: ExecutionPlanMetricsSet::new(),
         }
     }
 
@@ -1240,6 +1246,14 @@ impl FileSink for ParquetSink {
         mut file_stream_rx: DemuxedStreamReceiver,
         object_store: Arc<dyn ObjectStore>,
     ) -> Result<u64> {
+        let rows_written_counter =
+            MetricBuilder::new(&self.metrics).global_counter("rows_written");
+        let bytes_written_counter =
+            MetricBuilder::new(&self.metrics).global_counter("bytes_written");
+        let elapsed_compute = 
MetricBuilder::new(&self.metrics).elapsed_compute(0);
+
+        let write_start = datafusion_common::instant::Instant::now();
+
         let parquet_opts = &self.parquet_options;
 
         let mut file_write_tasks: JoinSet<
@@ -1317,12 +1331,18 @@ impl FileSink for ParquetSink {
             }
         }
 
-        let mut row_count = 0;
         while let Some(result) = file_write_tasks.join_next().await {
             match result {
                 Ok(r) => {
                     let (path, parquet_meta_data) = r?;
-                    row_count += parquet_meta_data.file_metadata().num_rows();
+                    let file_rows = 
parquet_meta_data.file_metadata().num_rows() as usize;
+                    let file_bytes: usize = parquet_meta_data
+                        .row_groups()
+                        .iter()
+                        .map(|rg| rg.compressed_size() as usize)
+                        .sum();
+                    rows_written_counter.add(file_rows);
+                    bytes_written_counter.add(file_bytes);
                     let mut written_files = self.written.lock();
                     written_files
                         .try_insert(path.clone(), parquet_meta_data)
@@ -1344,7 +1364,9 @@ impl FileSink for ParquetSink {
             .await
             .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
 
-        Ok(row_count as u64)
+        elapsed_compute.add_elapsed(write_start);
+
+        Ok(rows_written_counter.value() as u64)
     }
 }
 
@@ -1354,6 +1376,10 @@ impl DataSink for ParquetSink {
         self
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn schema(&self) -> &SchemaRef {
         self.config.output_schema()
     }
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 9af0dc6393..b050acd4b3 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -200,6 +200,17 @@ physical_plan
 01)DataSinkExec: sink=ParquetSink(file_groups=[])
 02)--DataSourceExec: partitions=1, partition_sizes=[1]
 
+# Verify ParquetSink exposes rows_written, bytes_written, and elapsed_compute 
metrics
+# Use a query with Sort and Projection to verify metrics across all operators
+query TT
+EXPLAIN ANALYZE COPY (SELECT col1, upper(col2) AS col2_upper FROM source_table 
ORDER BY col1) TO 'test_files/scratch/copy/table_metrics/' STORED AS PARQUET;
+----
+Plan with Metrics
+01)DataSinkExec: sink=ParquetSink(file_groups=[]), 
metrics=[elapsed_compute=<slt:ignore>, bytes_written=<slt:ignore>, 
rows_written=2]
+02)--SortExec: expr=[col1@0 ASC NULLS LAST], preserve_partitioning=[false], 
metrics=[output_rows=2, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, 
batches_split=0]
+03)----ProjectionExec: expr=[col1@0 as col1, upper(col2@1) as col2_upper], 
metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
+04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]
+
 # Copy to directory as partitioned files with keep_partition_by_columns enabled
 query I
 COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 
'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY 
(column1)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to