This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 6368daf0b6 Implement `partition_statistics` API for `RepartitionExec` (#17061) 6368daf0b6 is described below commit 6368daf0b60611d5c6388b21def4d623b4c71fdc Author: Liam Bao <liam.zw....@gmail.com> AuthorDate: Sun Aug 24 21:51:51 2025 -0400 Implement `partition_statistics` API for `RepartitionExec` (#17061) * Implement `partition_statistics` API for `RepartitionExec` * Test execution * Change the partition number for the test * Make all column stats absent * Use `ColumnStatistics::new_unknown()` * Add test case for 0 partitions * Return unknown statistics for 0 partitions --- .../physical_optimizer/partition_statistics.rs | 102 +++++++++++++++++++++ datafusion/physical-plan/src/repartition/mod.rs | 43 ++++++++- 2 files changed, 141 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index bfc09340cc..df1032e065 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -33,6 +33,7 @@ mod test { use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{binary, col, lit, Column}; + use datafusion_physical_expr::Partitioning; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::aggregates::{ @@ -47,6 +48,7 @@ mod test { use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::ProjectionExec; + use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ @@ -761,4 +763,104 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_statistic_by_partition_of_repartition() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + + let repartition = Arc::new(RepartitionExec::try_new( + scan.clone(), + Partitioning::RoundRobinBatch(3), + )?); + + let statistics = (0..repartition.partitioning().partition_count()) + .map(|idx| repartition.partition_statistics(Some(idx))) + .collect::<Result<Vec<_>>>()?; + assert_eq!(statistics.len(), 3); + + let expected_stats = Statistics { + num_rows: Precision::Inexact(1), + total_byte_size: Precision::Inexact(73), + column_statistics: vec![ + ColumnStatistics::new_unknown(), + ColumnStatistics::new_unknown(), + ], + }; + + // All partitions should have the same statistics + for stat in statistics.iter() { + assert_eq!(stat, &expected_stats); + } + + // Verify that the result has exactly 3 partitions + let partitions = execute_stream_partitioned( + repartition.clone(), + Arc::new(TaskContext::default()), + )?; + assert_eq!(partitions.len(), 3); + + // Collect row counts from each partition + let mut partition_row_counts = Vec::new(); + for partition_stream in partitions.into_iter() { + let results: Vec<RecordBatch> = partition_stream.try_collect().await?; + let total_rows: usize = results.iter().map(|batch| batch.num_rows()).sum(); + partition_row_counts.push(total_rows); + } + assert_eq!(partition_row_counts.len(), 3); + assert_eq!(partition_row_counts[0], 2); + assert_eq!(partition_row_counts[1], 2); + assert_eq!(partition_row_counts[2], 0); + + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_repartition_invalid_partition() -> Result<()> + { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + + let repartition = Arc::new(RepartitionExec::try_new( + scan.clone(), + Partitioning::RoundRobinBatch(2), + )?); + + let result = repartition.partition_statistics(Some(2)); + assert!(result.is_err()); + let error = result.unwrap_err(); + assert!(error + .to_string() + .contains("RepartitionExec invalid partition 2 (expected less than 2)")); + + let partitions = execute_stream_partitioned( + repartition.clone(), + Arc::new(TaskContext::default()), + )?; + assert_eq!(partitions.len(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_repartition_zero_partitions() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let scan_schema = scan.schema(); + + // Create a repartition with 0 partitions + let repartition = Arc::new(RepartitionExec::try_new( + Arc::new(EmptyExec::new(scan_schema.clone())), + Partitioning::RoundRobinBatch(0), + )?); + + let result = repartition.partition_statistics(Some(0))?; + assert_eq!(result, Statistics::new_unknown(&scan_schema)); + + // Verify that the result has exactly 0 partitions + let partitions = execute_stream_partitioned( + repartition.clone(), + Arc::new(TaskContext::default()), + )?; + assert_eq!(partitions.len(), 0); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 754a208126..3cd6ee6c1a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -45,8 +45,9 @@ use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; +use datafusion_common::stats::Precision; use datafusion_common::utils::transpose; -use datafusion_common::{internal_err, HashMap}; +use datafusion_common::{internal_err, ColumnStatistics, HashMap}; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; @@ -755,10 +756,44 @@ impl ExecutionPlan for RepartitionExec { } fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> { - if partition.is_none() { - self.input.partition_statistics(None) + if let Some(partition) = partition { + let partition_count = self.partitioning().partition_count(); + if partition_count == 0 { + return Ok(Statistics::new_unknown(&self.schema())); + } + + if partition >= partition_count { + return internal_err!( + "RepartitionExec invalid partition {} (expected less than {})", + partition, + self.partitioning().partition_count() + ); + } + + let mut stats = self.input.partition_statistics(None)?; + + // Distribute statistics across partitions + stats.num_rows = stats + .num_rows + .get_value() + .map(|rows| Precision::Inexact(rows / partition_count)) + .unwrap_or(Precision::Absent); + stats.total_byte_size = stats + .total_byte_size + .get_value() + .map(|bytes| Precision::Inexact(bytes / partition_count)) + .unwrap_or(Precision::Absent); + + // Make all column stats unknown + stats.column_statistics = stats + .column_statistics + .iter() + .map(|_| ColumnStatistics::new_unknown()) + .collect(); + + Ok(stats) } else { - Ok(Statistics::new_unknown(&self.schema())) + self.input.partition_statistics(None) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org