This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9611ac80d1 Implement `partition_statistics` API for `InterleaveExec` 
(#17051)
9611ac80d1 is described below

commit 9611ac80d1c26d4860a8dba109baf6fa2bbc35af
Author: Liam Bao <[email protected]>
AuthorDate: Thu Oct 2 01:16:28 2025 -0400

    Implement `partition_statistics` API for `InterleaveExec` (#17051)
    
    * Implement `partition_statistics` API for `InterleaveExec`
    
    * Add tests for hash repartitioning
---
 .../physical_optimizer/partition_statistics.rs     | 110 ++++++++++++++++++++-
 datafusion/physical-plan/src/union.rs              |   5 +-
 2 files changed, 108 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index a7b06bc7be..183c09c82d 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -50,7 +50,7 @@ mod test {
     use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
     use datafusion_physical_plan::repartition::RepartitionExec;
     use datafusion_physical_plan::sorts::sort::SortExec;
-    use datafusion_physical_plan::union::UnionExec;
+    use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
     use datafusion_physical_plan::{
         execute_stream_partitioned, get_plan_string, ExecutionPlan,
         ExecutionPlanProperties,
@@ -68,6 +68,7 @@ mod test {
     ///   - Second partition: [1, 2]
     /// - Each row is 110 bytes in size
     ///
+    /// @param create_table_sql Optional parameter to set the create table SQL
     /// @param target_partition Optional parameter to set the target partitions
     /// @return ExecutionPlan representing the scan of the table with 
statistics
     async fn create_scan_exec_with_statistics(
@@ -387,6 +388,64 @@ mod test {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_statistics_by_partition_of_interleave() -> Result<()> {
+        let scan1 = create_scan_exec_with_statistics(None, Some(1)).await;
+        let scan2 = create_scan_exec_with_statistics(None, Some(1)).await;
+
+        // Create same hash partitioning on the 'id' column as InterleaveExec
+        // requires all children have a consistent hash partitioning
+        let hash_expr1 = vec![col("id", &scan1.schema())?];
+        let repartition1 = Arc::new(RepartitionExec::try_new(
+            scan1,
+            Partitioning::Hash(hash_expr1, 2),
+        )?);
+        let hash_expr2 = vec![col("id", &scan2.schema())?];
+        let repartition2 = Arc::new(RepartitionExec::try_new(
+            scan2,
+            Partitioning::Hash(hash_expr2, 2),
+        )?);
+
+        let interleave: Arc<dyn ExecutionPlan> =
+            Arc::new(InterleaveExec::try_new(vec![repartition1, 
repartition2])?);
+
+        // Verify the result of partition statistics
+        let stats = (0..interleave.output_partitioning().partition_count())
+            .map(|idx| interleave.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+        assert_eq!(stats.len(), 2);
+
+        let expected_stats = Statistics {
+            num_rows: Precision::Inexact(4),
+            total_byte_size: Precision::Inexact(220),
+            column_statistics: vec![
+                ColumnStatistics::new_unknown(),
+                ColumnStatistics::new_unknown(),
+            ],
+        };
+        assert_eq!(stats[0], expected_stats);
+        assert_eq!(stats[1], expected_stats);
+
+        // Verify the execution results
+        let partitions = execute_stream_partitioned(
+            interleave.clone(),
+            Arc::new(TaskContext::default()),
+        )?;
+        assert_eq!(partitions.len(), 2);
+
+        let mut partition_row_counts = Vec::new();
+        for partition_stream in partitions.into_iter() {
+            let results: Vec<RecordBatch> = 
partition_stream.try_collect().await?;
+            let total_rows: usize = results.iter().map(|batch| 
batch.num_rows()).sum();
+            partition_row_counts.push(total_rows);
+        }
+        assert_eq!(partition_row_counts.len(), 2);
+        assert_eq!(partition_row_counts[0], 2);
+        assert_eq!(partition_row_counts[1], 6);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_statistic_by_partition_of_cross_join() -> Result<()> {
         let left_scan = create_scan_exec_with_statistics(None, Some(1)).await;
@@ -439,7 +498,6 @@ mod test {
     #[tokio::test]
     async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> {
         let scan = create_scan_exec_with_statistics(None, Some(2)).await;
-        dbg!(scan.partition_statistics(Some(0))?);
         let coalesce_batches: Arc<dyn ExecutionPlan> =
             Arc::new(CoalesceBatchesExec::new(scan, 2));
         let expected_statistic_partition_1 =
@@ -562,7 +620,6 @@ mod test {
         let _ = plan_string.swap_remove(1);
         let expected_plan = vec![
             "AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], 
aggr=[COUNT(c)]",
-            //"  DataSourceExec: file_groups={2 groups: 
[[.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
 
.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
 
[.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
 
.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]},
 projection=[id, d [...]
         ];
         assert_eq!(plan_string, expected_plan);
 
@@ -874,4 +931,51 @@ mod test {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_statistic_by_partition_of_repartition_hash_partitioning() -> 
Result<()>
+    {
+        let scan = create_scan_exec_with_statistics(None, Some(1)).await;
+
+        // Create hash partitioning on the 'id' column
+        let hash_expr = vec![col("id", &scan.schema())?];
+        let repartition = Arc::new(RepartitionExec::try_new(
+            scan,
+            Partitioning::Hash(hash_expr, 2),
+        )?);
+
+        // Verify the result of partition statistics of repartition
+        let stats = (0..repartition.partitioning().partition_count())
+            .map(|idx| repartition.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+        assert_eq!(stats.len(), 2);
+
+        let expected_stats = Statistics {
+            num_rows: Precision::Inexact(2),
+            total_byte_size: Precision::Inexact(110),
+            column_statistics: vec![
+                ColumnStatistics::new_unknown(),
+                ColumnStatistics::new_unknown(),
+            ],
+        };
+        assert_eq!(stats[0], expected_stats);
+        assert_eq!(stats[1], expected_stats);
+
+        // Verify the repartition execution results
+        let partitions =
+            execute_stream_partitioned(repartition, 
Arc::new(TaskContext::default()))?;
+        assert_eq!(partitions.len(), 2);
+
+        let mut partition_row_counts = Vec::new();
+        for partition_stream in partitions.into_iter() {
+            let results: Vec<RecordBatch> = 
partition_stream.try_collect().await?;
+            let total_rows: usize = results.iter().map(|batch| 
batch.num_rows()).sum();
+            partition_row_counts.push(total_rows);
+        }
+        assert_eq!(partition_row_counts.len(), 2);
+        assert_eq!(partition_row_counts[0], 1);
+        assert_eq!(partition_row_counts[1], 3);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index b4591f46f0..a0ccada2f9 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -531,13 +531,10 @@ impl ExecutionPlan for InterleaveExec {
     }
 
     fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
-        if partition.is_some() {
-            return Ok(Statistics::new_unknown(&self.schema()));
-        }
         let stats = self
             .inputs
             .iter()
-            .map(|stat| stat.partition_statistics(None))
+            .map(|stat| stat.partition_statistics(partition))
             .collect::<Result<Vec<_>>>()?;
 
         Ok(stats


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to