This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 37b9a466c4 feat: `partition_statistics()` for HashJoinExec (#20711)
37b9a466c4 is described below

commit 37b9a466c42e472cb6b86783f294f9f734af3d05
Author: Jonathan Chen <[email protected]>
AuthorDate: Mon Mar 9 03:42:08 2026 -0500

    feat: `partition_statistics()` for HashJoinExec (#20711)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #15873.
    
    ## Rationale for this change
    Copy of https://github.com/apache/datafusion/pull/16956
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 .../physical_optimizer/partition_statistics.rs     | 253 ++++++++++++++++++++-
 .../physical-plan/src/joins/hash_join/exec.rs      |  60 +++--
 2 files changed, 298 insertions(+), 15 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index fa021ed3dc..a03792fae8 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -27,7 +27,9 @@ mod test {
     use datafusion_catalog::TableProvider;
     use datafusion_common::Result;
     use datafusion_common::stats::Precision;
-    use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
+    use datafusion_common::{
+        ColumnStatistics, JoinType, NullEquality, ScalarValue, Statistics,
+    };
     use datafusion_execution::TaskContext;
     use datafusion_execution::config::SessionConfig;
     use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
@@ -45,7 +47,9 @@ mod test {
     use datafusion_physical_plan::common::compute_record_batch_statistics;
     use datafusion_physical_plan::empty::EmptyExec;
     use datafusion_physical_plan::filter::FilterExec;
-    use datafusion_physical_plan::joins::{CrossJoinExec, NestedLoopJoinExec};
+    use datafusion_physical_plan::joins::{
+        CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
+    };
     use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
     use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
@@ -1354,4 +1358,249 @@ mod test {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_hash_join_partition_statistics() -> Result<()> {
+        // Create left table scan and coalesce to 1 partition for CollectLeft 
mode
+        let left_scan = create_scan_exec_with_statistics(None, Some(2)).await;
+        let left_scan_coalesced = 
Arc::new(CoalescePartitionsExec::new(left_scan.clone()))
+            as Arc<dyn ExecutionPlan>;
+
+        // Create right table scan with different table name
+        let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT 
NULL, date DATE) \
+                                     STORED AS PARQUET LOCATION 
'./tests/data/test_statistics_per_partition'\
+                                     PARTITIONED BY (date) \
+                                     WITH ORDER (id ASC);";
+        let right_scan =
+            create_scan_exec_with_statistics(Some(right_create_table_sql), 
Some(2)).await;
+
+        // Create join condition: t1.id = t2.id
+        let on = vec![(
+            Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>,
+            Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>,
+        )];
+
+        // Test CollectLeft mode - left child must have 1 partition
+        let collect_left_join = Arc::new(HashJoinExec::try_new(
+            left_scan_coalesced,
+            Arc::clone(&right_scan),
+            on.clone(),
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::CollectLeft,
+            NullEquality::NullEqualsNothing,
+            false,
+        )?) as Arc<dyn ExecutionPlan>;
+
+        // Test partition statistics for CollectLeft mode
+        let statistics = 
(0..collect_left_join.output_partitioning().partition_count())
+            .map(|idx| collect_left_join.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+
+        // Check that we have the expected number of partitions
+        assert_eq!(statistics.len(), 2);
+
+        // For collect left mode, the min/max values are from the entire left 
table and the specific partition of the right table.
+        let expected_p0_statistics = Statistics {
+            num_rows: Precision::Inexact(2),
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![
+                // Left id column: all partitions (id 1..4)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(16),
+                },
+                // Left date column: all partitions (2025-03-01..2025-03-04)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_04,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_01,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(16),
+                },
+                // Right id column: partition 0 only (id 3..4)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                // Right date column: partition 0 only (2025-03-01..2025-03-02)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_02,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_01,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+            ],
+        };
+        assert_eq!(statistics[0], expected_p0_statistics);
+
+        // Test Partitioned mode
+        let partitioned_join = Arc::new(HashJoinExec::try_new(
+            Arc::clone(&left_scan),
+            Arc::clone(&right_scan),
+            on.clone(),
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::Partitioned,
+            NullEquality::NullEqualsNothing,
+            false,
+        )?) as Arc<dyn ExecutionPlan>;
+
+        // Test partition statistics for Partitioned mode
+        let statistics = 
(0..partitioned_join.output_partitioning().partition_count())
+            .map(|idx| partitioned_join.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+
+        // Check that we have the expected number of partitions
+        assert_eq!(statistics.len(), 2);
+
+        // For partitioned mode, the min/max values are from the specific 
partition for each side.
+        let expected_p0_statistics = Statistics {
+            num_rows: Precision::Inexact(2),
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![
+                // Left id column: partition 0 only (id 3..4)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                // Left date column: partition 0 only (2025-03-01..2025-03-02)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_02,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_01,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                // Right id column: partition 0 only (id 3..4)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                // Right date column: partition 0 only (2025-03-01..2025-03-02)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_02,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_01,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+            ],
+        };
+        assert_eq!(statistics[0], expected_p0_statistics);
+
+        // Test Auto mode - should fall back to getting all partition 
statistics
+        let auto_join = Arc::new(HashJoinExec::try_new(
+            Arc::clone(&left_scan),
+            Arc::clone(&right_scan),
+            on,
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::Auto,
+            NullEquality::NullEqualsNothing,
+            false,
+        )?) as Arc<dyn ExecutionPlan>;
+
+        // Test partition statistics for Auto mode
+        let statistics = (0..auto_join.output_partitioning().partition_count())
+            .map(|idx| auto_join.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+
+        // Check that we have the expected number of partitions
+        assert_eq!(statistics.len(), 2);
+
+        // For auto mode, the min/max values are from the entire left and 
right tables.
+        let expected_p0_statistics = Statistics {
+            num_rows: Precision::Inexact(4),
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![
+                // Left id column: all partitions (id 1..4)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(16),
+                },
+                // Left date column: all partitions (2025-03-01..2025-03-04)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_04,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_01,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(16),
+                },
+                // Right id column: all partitions (id 1..4)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(16),
+                },
+                // Right date column: all partitions (2025-03-01..2025-03-04)
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_04,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_01,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(16),
+                },
+            ],
+        };
+        assert_eq!(statistics[0], expected_p0_statistics);
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index b5e8d3c1e3..c07ae2d651 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -1442,19 +1442,53 @@ impl ExecutionPlan for HashJoinExec {
     }
 
     fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
-        if partition.is_some() {
-            return Ok(Statistics::new_unknown(&self.schema()));
-        }
-        // TODO stats: it is not possible in general to know the output size 
of joins
-        // There are some special cases though, for example:
-        // - `A LEFT JOIN B ON A.col=B.col` with 
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
-        let stats = estimate_join_statistics(
-            self.left.partition_statistics(None)?,
-            self.right.partition_statistics(None)?,
-            &self.on,
-            &self.join_type,
-            &self.join_schema,
-        )?;
+        let stats = match (partition, self.mode) {
+            // For CollectLeft mode, the left side is collected into a single 
partition,
+            // so all left partitions are available to each output partition.
+            // For the right side, we need the specific partition statistics.
+            (Some(partition), PartitionMode::CollectLeft) => {
+                let left_stats = self.left.partition_statistics(None)?;
+                let right_stats = 
self.right.partition_statistics(Some(partition))?;
+
+                estimate_join_statistics(
+                    left_stats,
+                    right_stats,
+                    &self.on,
+                    &self.join_type,
+                    &self.join_schema,
+                )?
+            }
+
+            // For Partitioned mode, both sides are partitioned, so each 
output partition
+            // only has access to the corresponding partition from both sides.
+            (Some(partition), PartitionMode::Partitioned) => {
+                let left_stats = 
self.left.partition_statistics(Some(partition))?;
+                let right_stats = 
self.right.partition_statistics(Some(partition))?;
+
+                estimate_join_statistics(
+                    left_stats,
+                    right_stats,
+                    &self.on,
+                    &self.join_type,
+                    &self.join_schema,
+                )?
+            }
+
+            // For Auto mode or when no specific partition is requested, fall 
back to
+            // the current behavior of getting all partition statistics.
+            (None, _) | (Some(_), PartitionMode::Auto) => {
+                // TODO stats: it is not possible in general to know the 
output size of joins
+                // There are some special cases though, for example:
+                // - `A LEFT JOIN B ON A.col=B.col` with 
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
+                estimate_join_statistics(
+                    self.left.partition_statistics(None)?,
+                    self.right.partition_statistics(None)?,
+                    &self.on,
+                    &self.join_type,
+                    &self.join_schema,
+                )?
+            }
+        };
         // Project statistics if there is a projection
         let stats = stats.project(self.projection.as_ref());
         // Apply fetch limit to statistics


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to