This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6368daf0b6 Implement `partition_statistics` API for `RepartitionExec` 
(#17061)
6368daf0b6 is described below

commit 6368daf0b60611d5c6388b21def4d623b4c71fdc
Author: Liam Bao <liam.zw....@gmail.com>
AuthorDate: Sun Aug 24 21:51:51 2025 -0400

    Implement `partition_statistics` API for `RepartitionExec` (#17061)
    
    * Implement `partition_statistics` API for `RepartitionExec`
    
    * Test execution
    
    * Change the partition number for the test
    
    * Make all column stats absent
    
    * Use `ColumnStatistics::new_unknown()`
    
    * Add test case for 0 partitions
    
    * Return unknown statistics for 0 partitions
---
 .../physical_optimizer/partition_statistics.rs     | 102 +++++++++++++++++++++
 datafusion/physical-plan/src/repartition/mod.rs    |  43 ++++++++-
 2 files changed, 141 insertions(+), 4 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index bfc09340cc..df1032e065 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -33,6 +33,7 @@ mod test {
     use datafusion_functions_aggregate::count::count_udaf;
     use datafusion_physical_expr::aggregate::AggregateExprBuilder;
     use datafusion_physical_expr::expressions::{binary, col, lit, Column};
+    use datafusion_physical_expr::Partitioning;
     use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
     use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
     use datafusion_physical_plan::aggregates::{
@@ -47,6 +48,7 @@ mod test {
     use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
     use datafusion_physical_plan::projection::ProjectionExec;
+    use datafusion_physical_plan::repartition::RepartitionExec;
     use datafusion_physical_plan::sorts::sort::SortExec;
     use datafusion_physical_plan::union::UnionExec;
     use datafusion_physical_plan::{
@@ -761,4 +763,104 @@ mod test {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_statistic_by_partition_of_repartition() -> Result<()> {
+        let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+
+        let repartition = Arc::new(RepartitionExec::try_new(
+            scan.clone(),
+            Partitioning::RoundRobinBatch(3),
+        )?);
+
+        let statistics = (0..repartition.partitioning().partition_count())
+            .map(|idx| repartition.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+        assert_eq!(statistics.len(), 3);
+
+        let expected_stats = Statistics {
+            num_rows: Precision::Inexact(1),
+            total_byte_size: Precision::Inexact(73),
+            column_statistics: vec![
+                ColumnStatistics::new_unknown(),
+                ColumnStatistics::new_unknown(),
+            ],
+        };
+
+        // All partitions should have the same statistics
+        for stat in statistics.iter() {
+            assert_eq!(stat, &expected_stats);
+        }
+
+        // Verify that the result has exactly 3 partitions
+        let partitions = execute_stream_partitioned(
+            repartition.clone(),
+            Arc::new(TaskContext::default()),
+        )?;
+        assert_eq!(partitions.len(), 3);
+
+        // Collect row counts from each partition
+        let mut partition_row_counts = Vec::new();
+        for partition_stream in partitions.into_iter() {
+            let results: Vec<RecordBatch> = 
partition_stream.try_collect().await?;
+            let total_rows: usize = results.iter().map(|batch| 
batch.num_rows()).sum();
+            partition_row_counts.push(total_rows);
+        }
+        assert_eq!(partition_row_counts.len(), 3);
+        assert_eq!(partition_row_counts[0], 2);
+        assert_eq!(partition_row_counts[1], 2);
+        assert_eq!(partition_row_counts[2], 0);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_statistic_by_partition_of_repartition_invalid_partition() -> 
Result<()>
+    {
+        let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+
+        let repartition = Arc::new(RepartitionExec::try_new(
+            scan.clone(),
+            Partitioning::RoundRobinBatch(2),
+        )?);
+
+        let result = repartition.partition_statistics(Some(2));
+        assert!(result.is_err());
+        let error = result.unwrap_err();
+        assert!(error
+            .to_string()
+            .contains("RepartitionExec invalid partition 2 (expected less than 
2)"));
+
+        let partitions = execute_stream_partitioned(
+            repartition.clone(),
+            Arc::new(TaskContext::default()),
+        )?;
+        assert_eq!(partitions.len(), 2);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_statistic_by_partition_of_repartition_zero_partitions() -> 
Result<()> {
+        let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+        let scan_schema = scan.schema();
+
+        // Create a repartition with 0 partitions
+        let repartition = Arc::new(RepartitionExec::try_new(
+            Arc::new(EmptyExec::new(scan_schema.clone())),
+            Partitioning::RoundRobinBatch(0),
+        )?);
+
+        let result = repartition.partition_statistics(Some(0))?;
+        assert_eq!(result, Statistics::new_unknown(&scan_schema));
+
+        // Verify that the result has exactly 0 partitions
+        let partitions = execute_stream_partitioned(
+            repartition.clone(),
+            Arc::new(TaskContext::default()),
+        )?;
+        assert_eq!(partitions.len(), 0);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 754a208126..3cd6ee6c1a 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -45,8 +45,9 @@ use arrow::array::{PrimitiveArray, RecordBatch, 
RecordBatchOptions};
 use arrow::compute::take_arrays;
 use arrow::datatypes::{SchemaRef, UInt32Type};
 use datafusion_common::config::ConfigOptions;
+use datafusion_common::stats::Precision;
 use datafusion_common::utils::transpose;
-use datafusion_common::{internal_err, HashMap};
+use datafusion_common::{internal_err, ColumnStatistics, HashMap};
 use datafusion_common::{not_impl_err, DataFusionError, Result};
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::memory_pool::MemoryConsumer;
@@ -755,10 +756,44 @@ impl ExecutionPlan for RepartitionExec {
     }
 
     fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
-        if partition.is_none() {
-            self.input.partition_statistics(None)
+        if let Some(partition) = partition {
+            let partition_count = self.partitioning().partition_count();
+            if partition_count == 0 {
+                return Ok(Statistics::new_unknown(&self.schema()));
+            }
+
+            if partition >= partition_count {
+                return internal_err!(
+                    "RepartitionExec invalid partition {} (expected less than 
{})",
+                    partition,
+                    self.partitioning().partition_count()
+                );
+            }
+
+            let mut stats = self.input.partition_statistics(None)?;
+
+            // Distribute statistics across partitions
+            stats.num_rows = stats
+                .num_rows
+                .get_value()
+                .map(|rows| Precision::Inexact(rows / partition_count))
+                .unwrap_or(Precision::Absent);
+            stats.total_byte_size = stats
+                .total_byte_size
+                .get_value()
+                .map(|bytes| Precision::Inexact(bytes / partition_count))
+                .unwrap_or(Precision::Absent);
+
+            // Make all column stats unknown
+            stats.column_statistics = stats
+                .column_statistics
+                .iter()
+                .map(|_| ColumnStatistics::new_unknown())
+                .collect();
+
+            Ok(stats)
         } else {
-            Ok(Statistics::new_unknown(&self.schema()))
+            self.input.partition_statistics(None)
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to