xudong963 commented on code in PR #20307:
URL: https://github.com/apache/datafusion/pull/20307#discussion_r2869756600
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -1410,12 +1424,18 @@ impl FileSink for ParquetSink {
}
}
- let mut row_count = 0;
while let Some(result) = file_write_tasks.join_next().await {
match result {
Ok(r) => {
let (path, parquet_meta_data) = r?;
- row_count += parquet_meta_data.file_metadata().num_rows();
+ let file_rows =
parquet_meta_data.file_metadata().num_rows() as usize;
+ let file_bytes: usize = parquet_meta_data
+ .row_groups()
+ .iter()
+ .map(|rg| rg.compressed_size() as usize)
+ .sum();
+ rows_written_counter.add(file_rows);
+ bytes_written_counter.add(file_bytes);
Review Comment:
Good point, I think `bytes_written` is the intuitive name that users expect
for a sink metric. I'd like to keep the name `bytes_written` but adding a brief
doc comment explaining what it measures.
(Renaming to `compressed_row_group_bytes` is overly verbose and would
surprise users familiar with standard sink metrics. )
##########
datafusion/core/src/dataframe/parquet.rs:
##########
@@ -430,6 +430,149 @@ mod tests {
Ok(())
}
+ /// Test that ParquetSink exposes rows_written, bytes_written, and
+ /// elapsed_compute metrics via DataSinkExec.
+ #[tokio::test]
+ async fn test_parquet_sink_metrics() -> Result<()> {
+ use arrow::array::Int32Array;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
+ use datafusion_execution::TaskContext;
+
+ use futures::TryStreamExt;
+
+ let ctx = SessionContext::new();
+ let tmp_dir = TempDir::new()?;
+ let output_path = tmp_dir.path().join("metrics_test.parquet");
+ let output_path_str = output_path.to_str().unwrap();
+
+ // Register a table with 100 rows
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("val", DataType::Int32, false),
+ ]));
+ let ids: Vec<i32> = (0..100).collect();
+ let vals: Vec<i32> = (100..200).collect();
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(vals)),
+ ],
+ )?;
+ ctx.register_batch("source", batch)?;
+
+ // Create the physical plan for COPY TO
+ let df = ctx
+ .sql(&format!(
+ "COPY source TO '{output_path_str}' STORED AS PARQUET"
+ ))
+ .await?;
+ let plan = df.create_physical_plan().await?;
+
+ // Execute the plan
+ let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
+ let stream = plan.execute(0, task_ctx)?;
+ let _batches: Vec<_> = stream.try_collect().await?;
+
+ // Check metrics on the DataSinkExec (top-level plan)
+ let metrics = plan
+ .metrics()
+ .expect("DataSinkExec should return metrics from ParquetSink");
+ let aggregated = metrics.aggregate_by_name();
+
+ // rows_written should be 100
+ let rows_written = aggregated
+ .iter()
+ .find(|m| m.value().name() == "rows_written")
+ .expect("should have rows_written metric");
Review Comment:
Good point!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]