This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9611ac80d1 Implement `partition_statistics` API for `InterleaveExec`
(#17051)
9611ac80d1 is described below
commit 9611ac80d1c26d4860a8dba109baf6fa2bbc35af
Author: Liam Bao <[email protected]>
AuthorDate: Thu Oct 2 01:16:28 2025 -0400
Implement `partition_statistics` API for `InterleaveExec` (#17051)
* Implement `partition_statistics` API for `InterleaveExec`
* Add tests for hash repartitioning
---
.../physical_optimizer/partition_statistics.rs | 110 ++++++++++++++++++++-
datafusion/physical-plan/src/union.rs | 5 +-
2 files changed, 108 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index a7b06bc7be..183c09c82d 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -50,7 +50,7 @@ mod test {
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
- use datafusion_physical_plan::union::UnionExec;
+ use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
use datafusion_physical_plan::{
execute_stream_partitioned, get_plan_string, ExecutionPlan,
ExecutionPlanProperties,
@@ -68,6 +68,7 @@ mod test {
/// - Second partition: [1, 2]
/// - Each row is 110 bytes in size
///
+ /// @param create_table_sql Optional parameter to set the create table SQL
/// @param target_partition Optional parameter to set the target partitions
/// @return ExecutionPlan representing the scan of the table with
statistics
async fn create_scan_exec_with_statistics(
@@ -387,6 +388,64 @@ mod test {
Ok(())
}
+ #[tokio::test]
+ async fn test_statistics_by_partition_of_interleave() -> Result<()> {
+ let scan1 = create_scan_exec_with_statistics(None, Some(1)).await;
+ let scan2 = create_scan_exec_with_statistics(None, Some(1)).await;
+
+ // Create same hash partitioning on the 'id' column as InterleaveExec
+ // requires all children have a consistent hash partitioning
+ let hash_expr1 = vec![col("id", &scan1.schema())?];
+ let repartition1 = Arc::new(RepartitionExec::try_new(
+ scan1,
+ Partitioning::Hash(hash_expr1, 2),
+ )?);
+ let hash_expr2 = vec![col("id", &scan2.schema())?];
+ let repartition2 = Arc::new(RepartitionExec::try_new(
+ scan2,
+ Partitioning::Hash(hash_expr2, 2),
+ )?);
+
+ let interleave: Arc<dyn ExecutionPlan> =
+ Arc::new(InterleaveExec::try_new(vec![repartition1,
repartition2])?);
+
+ // Verify the result of partition statistics
+ let stats = (0..interleave.output_partitioning().partition_count())
+ .map(|idx| interleave.partition_statistics(Some(idx)))
+ .collect::<Result<Vec<_>>>()?;
+ assert_eq!(stats.len(), 2);
+
+ let expected_stats = Statistics {
+ num_rows: Precision::Inexact(4),
+ total_byte_size: Precision::Inexact(220),
+ column_statistics: vec![
+ ColumnStatistics::new_unknown(),
+ ColumnStatistics::new_unknown(),
+ ],
+ };
+ assert_eq!(stats[0], expected_stats);
+ assert_eq!(stats[1], expected_stats);
+
+ // Verify the execution results
+ let partitions = execute_stream_partitioned(
+ interleave.clone(),
+ Arc::new(TaskContext::default()),
+ )?;
+ assert_eq!(partitions.len(), 2);
+
+ let mut partition_row_counts = Vec::new();
+ for partition_stream in partitions.into_iter() {
+ let results: Vec<RecordBatch> =
partition_stream.try_collect().await?;
+ let total_rows: usize = results.iter().map(|batch|
batch.num_rows()).sum();
+ partition_row_counts.push(total_rows);
+ }
+ assert_eq!(partition_row_counts.len(), 2);
+ assert_eq!(partition_row_counts[0], 2);
+ assert_eq!(partition_row_counts[1], 6);
+
+ Ok(())
+ }
+
#[tokio::test]
async fn test_statistic_by_partition_of_cross_join() -> Result<()> {
let left_scan = create_scan_exec_with_statistics(None, Some(1)).await;
@@ -439,7 +498,6 @@ mod test {
#[tokio::test]
async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> {
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
- dbg!(scan.partition_statistics(Some(0))?);
let coalesce_batches: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(scan, 2));
let expected_statistic_partition_1 =
@@ -562,7 +620,6 @@ mod test {
let _ = plan_string.swap_remove(1);
let expected_plan = vec![
"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr],
aggr=[COUNT(c)]",
- //" DataSourceExec: file_groups={2 groups:
[[.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]},
projection=[id, d [...]
];
assert_eq!(plan_string, expected_plan);
@@ -874,4 +931,51 @@ mod test {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_statistic_by_partition_of_repartition_hash_partitioning() ->
Result<()>
+ {
+ let scan = create_scan_exec_with_statistics(None, Some(1)).await;
+
+ // Create hash partitioning on the 'id' column
+ let hash_expr = vec![col("id", &scan.schema())?];
+ let repartition = Arc::new(RepartitionExec::try_new(
+ scan,
+ Partitioning::Hash(hash_expr, 2),
+ )?);
+
+ // Verify the result of partition statistics of repartition
+ let stats = (0..repartition.partitioning().partition_count())
+ .map(|idx| repartition.partition_statistics(Some(idx)))
+ .collect::<Result<Vec<_>>>()?;
+ assert_eq!(stats.len(), 2);
+
+ let expected_stats = Statistics {
+ num_rows: Precision::Inexact(2),
+ total_byte_size: Precision::Inexact(110),
+ column_statistics: vec![
+ ColumnStatistics::new_unknown(),
+ ColumnStatistics::new_unknown(),
+ ],
+ };
+ assert_eq!(stats[0], expected_stats);
+ assert_eq!(stats[1], expected_stats);
+
+ // Verify the repartition execution results
+ let partitions =
+ execute_stream_partitioned(repartition,
Arc::new(TaskContext::default()))?;
+ assert_eq!(partitions.len(), 2);
+
+ let mut partition_row_counts = Vec::new();
+ for partition_stream in partitions.into_iter() {
+ let results: Vec<RecordBatch> =
partition_stream.try_collect().await?;
+ let total_rows: usize = results.iter().map(|batch|
batch.num_rows()).sum();
+ partition_row_counts.push(total_rows);
+ }
+ assert_eq!(partition_row_counts.len(), 2);
+ assert_eq!(partition_row_counts[0], 1);
+ assert_eq!(partition_row_counts[1], 3);
+
+ Ok(())
+ }
}
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index b4591f46f0..a0ccada2f9 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -531,13 +531,10 @@ impl ExecutionPlan for InterleaveExec {
}
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema()));
- }
let stats = self
.inputs
.iter()
- .map(|stat| stat.partition_statistics(None))
+ .map(|stat| stat.partition_statistics(partition))
.collect::<Result<Vec<_>>>()?;
Ok(stats
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]