This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 44756da53 fix(join-selection): guard CollectLeft swap when right has 
multiple partitions (#1691)
44756da53 is described below

commit 44756da539c02d192d9a28e6c3d0e5ff9cae6f1c
Author: Andy Grove <[email protected]>
AuthorDate: Wed May 13 03:21:43 2026 -0600

    fix(join-selection): guard CollectLeft swap when right has multiple 
partitions (#1691)
    
    * fix(join-selection): guard CollectLeft swap when right has multiple 
partitions
    
    The Ballista-customized JoinSelection could swap a HashJoinExec(CollectLeft)
    join's inputs based on byte-size statistics alone, putting a multi-partition
    reader on the build side. Ballista has no post-JoinSelection distribution
    pass, so the swapped plan reaches the executor and trips
    HashJoinExec::execute's assertion that left_partitions == 1 in CollectLeft
    mode. This surfaces on TPC-H Q2 with prefer_hash_join=true, where stage 11's
    broadcast reader happens to be larger in bytes than its hash probe reader,
    so should_swap_join_order returns true and the swap is performed.
    
    Guard try_collect_left's two swap arms with
    right.output_partitioning().partition_count() == 1 so a swap into 
CollectLeft
    only fires when the post-swap left will satisfy the invariant.
    
    Add a regression test that builds a CollectLeft join with a 1-partition,
    heavier left and a 2-partition, lighter right and asserts the post-optimize
    plan still has a 1-partition build side.
    
    Closes #1681
    
    * address minor comments
    
    ---------
    
    Co-authored-by: Marko Milenković <[email protected]>
---
 .../src/physical_optimizer/join_selection.rs       | 119 ++++++++++++++++++++-
 1 file changed, 118 insertions(+), 1 deletion(-)

diff --git a/ballista/scheduler/src/physical_optimizer/join_selection.rs 
b/ballista/scheduler/src/physical_optimizer/join_selection.rs
index 24a9b5c59..b50b0f3c2 100644
--- a/ballista/scheduler/src/physical_optimizer/join_selection.rs
+++ b/ballista/scheduler/src/physical_optimizer/join_selection.rs
@@ -237,11 +237,17 @@ pub(crate) fn try_collect_left(
             threshold_num_rows,
         );
 
+    // Swapping into `CollectLeft` puts the current right onto the build side,
+    // and `HashJoinExec` requires the build side to have exactly one partition
+    // in that mode. Ballista has no post-`JoinSelection` distribution pass to
+    // fix this, so any swap that would break the invariant must be skipped.
+    let right_one_partition = right.output_partitioning().partition_count() == 
1;
     match (left_can_collect, right_can_collect) {
         (true, true) => {
             // Don't swap null-aware anti joins as they have specific side 
requirements
             if hash_join.join_type().supports_swap()
                 && !hash_join.null_aware
+                && right_one_partition
                 && should_swap_join_order(&**left, &**right)?
             {
                 Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
@@ -262,7 +268,10 @@ pub(crate) fn try_collect_left(
         ))),
         (false, true) => {
             // Don't swap null-aware anti joins as they have specific side 
requirements
-            if hash_join.join_type().supports_swap() && !hash_join.null_aware {
+            if hash_join.join_type().supports_swap()
+                && !hash_join.null_aware
+                && right_one_partition
+            {
                 hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
             } else {
                 Ok(None)
@@ -743,6 +752,114 @@ mod test {
             optimizer_options.hash_join_single_partition_threshold,
         )
     }
+    // Regression for https://github.com/apache/datafusion-ballista/issues/1681
+    //
+    // `JoinSelection` must not swap a `HashJoinExec(CollectLeft)` join's
+    // inputs in a way that puts a multi-partition reader on the build side.
+    // Ballista's pipeline has no `EnforceDistribution` pass after this rule,
+    // so the swapped plan reaches the executor unchanged and trips
+    // `HashJoinExec::execute`'s assertion that `left_partitions == 1` in
+    // `CollectLeft` mode.
+    #[tokio::test]
+    async fn collect_left_swap_preserves_one_partition_build() {
+        use datafusion::{
+            common::NullEquality,
+            config::ConfigOptions,
+            physical_expr::expressions::Column,
+            physical_optimizer::PhysicalOptimizerRule,
+            physical_plan::{
+                ExecutionPlanProperties, 
coalesce_partitions::CoalescePartitionsExec,
+                joins::HashJoinExec, joins::PartitionMode,
+            },
+        };
+
+        use crate::physical_optimizer::join_selection::JoinSelection;
+
+        let schema = Schema::new(vec![Field::new("k", DataType::Int32, 
false)]);
+
+        // Build side: 1 logical partition (e.g. a Ballista broadcast
+        // ShuffleReaderExec) but BIG total_byte_size.
+        let big_inner = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Exact(10_000),
+                total_byte_size: Precision::Exact(1_000_000),
+                column_statistics: vec![ColumnStatistics::new_unknown()],
+            },
+            schema.clone(),
+        ));
+        let left =
+            Arc::new(CoalescePartitionsExec::new(big_inner)) as Arc<dyn 
ExecutionPlan>;
+        assert_eq!(left.output_partitioning().partition_count(), 1);
+
+        // Probe side: 2 partitions (the raw StatisticsExec default), SMALL
+        // total_byte_size. Stand-in for a multi-partition hash shuffle reader.
+        let right = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Exact(3_000),
+                total_byte_size: Precision::Exact(200_000),
+                column_statistics: vec![ColumnStatistics::new_unknown()],
+            },
+            schema.clone(),
+        )) as Arc<dyn ExecutionPlan>;
+        assert!(right.output_partitioning().partition_count() > 1);
+
+        let on = vec![(
+            Arc::new(Column::new_with_schema("k", &left.schema()).unwrap()) as 
_,
+            Arc::new(Column::new_with_schema("k", &right.schema()).unwrap()) 
as _,
+        )];
+
+        let join = Arc::new(
+            HashJoinExec::try_new(
+                Arc::clone(&left),
+                Arc::clone(&right),
+                on,
+                None,
+                &JoinType::Inner,
+                None,
+                PartitionMode::CollectLeft,
+                NullEquality::NullEqualsNothing,
+                false,
+            )
+            .unwrap(),
+        ) as Arc<dyn ExecutionPlan>;
+
+        let optimized = JoinSelection::new()
+            .optimize(join, &ConfigOptions::new())
+            .unwrap();
+
+        // `swap_inputs` for Inner wraps the join in a ProjectionExec to
+        // restore the output column order. Walk the tree to find the join.
+        fn find_hash_join(plan: &Arc<dyn ExecutionPlan>) -> 
Option<&HashJoinExec> {
+            if let Some(hj) = plan.as_any().downcast_ref::<HashJoinExec>() {
+                return Some(hj);
+            }
+            for child in plan.children() {
+                if let Some(hj) = find_hash_join(child) {
+                    return Some(hj);
+                }
+            }
+            None
+        }
+
+        let hj =
+            find_hash_join(&optimized).expect("HashJoinExec missing from 
optimized plan");
+
+        assert_eq!(*hj.partition_mode(), PartitionMode::CollectLeft);
+        assert_eq!(
+            hj.left().output_partitioning().partition_count(),
+            1,
+            "JoinSelection swapped a CollectLeft join's inputs and ended up \
+             with a multi-partition reader on the build side, which violates \
+             CollectLeft's invariant",
+        );
+        assert_eq!(
+            hj.right().output_partitioning().partition_count(),
+            2,
+            "JoinSelection swapped a CollectLeft join's inputs and ended up \
+             with a multi-partition reader on the build side, which violates \
+             CollectLeft's invariant",
+        );
+    }
 
     /// Create join filter for NLJoinExec with expression `big_col > small_col`
     /// where both columns are 0-indexed and come from left and right inputs 
respectively


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to