This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 44756da53 fix(join-selection): guard CollectLeft swap when right has
multiple partitions (#1691)
44756da53 is described below
commit 44756da539c02d192d9a28e6c3d0e5ff9cae6f1c
Author: Andy Grove <[email protected]>
AuthorDate: Wed May 13 03:21:43 2026 -0600
fix(join-selection): guard CollectLeft swap when right has multiple
partitions (#1691)
* fix(join-selection): guard CollectLeft swap when right has multiple
partitions
The Ballista-customized JoinSelection could swap a HashJoinExec(CollectLeft)
join's inputs based on byte-size statistics alone, putting a multi-partition
reader on the build side. Ballista has no post-JoinSelection distribution
pass, so the swapped plan reaches the executor and trips
HashJoinExec::execute's assertion that left_partitions == 1 in CollectLeft
mode. This surfaces on TPC-H Q2 with prefer_hash_join=true, where stage 11's
broadcast reader happens to be larger in bytes than its hash probe reader,
so should_swap_join_order returns true and the swap is performed.
Guard try_collect_left's two swap arms with
right.output_partitioning().partition_count() == 1 so a swap into
CollectLeft
only fires when the post-swap left will satisfy the invariant.
Add a regression test that builds a CollectLeft join with a 1-partition,
heavier left and a 2-partition, lighter right and asserts the post-optimize
plan still has a 1-partition build side.
Closes #1681
* address minor comments
---------
Co-authored-by: Marko Milenković <[email protected]>
---
.../src/physical_optimizer/join_selection.rs | 119 ++++++++++++++++++++-
1 file changed, 118 insertions(+), 1 deletion(-)
diff --git a/ballista/scheduler/src/physical_optimizer/join_selection.rs
b/ballista/scheduler/src/physical_optimizer/join_selection.rs
index 24a9b5c59..b50b0f3c2 100644
--- a/ballista/scheduler/src/physical_optimizer/join_selection.rs
+++ b/ballista/scheduler/src/physical_optimizer/join_selection.rs
@@ -237,11 +237,17 @@ pub(crate) fn try_collect_left(
threshold_num_rows,
);
+ // Swapping into `CollectLeft` puts the current right onto the build side,
+ // and `HashJoinExec` requires the build side to have exactly one partition
+ // in that mode. Ballista has no post-`JoinSelection` distribution pass to
+ // fix this, so any swap that would break the invariant must be skipped.
+ let right_one_partition = right.output_partitioning().partition_count() ==
1;
match (left_can_collect, right_can_collect) {
(true, true) => {
// Don't swap null-aware anti joins as they have specific side
requirements
if hash_join.join_type().supports_swap()
&& !hash_join.null_aware
+ && right_one_partition
&& should_swap_join_order(&**left, &**right)?
{
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
@@ -262,7 +268,10 @@ pub(crate) fn try_collect_left(
))),
(false, true) => {
// Don't swap null-aware anti joins as they have specific side
requirements
- if hash_join.join_type().supports_swap() && !hash_join.null_aware {
+ if hash_join.join_type().supports_swap()
+ && !hash_join.null_aware
+ && right_one_partition
+ {
hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
} else {
Ok(None)
@@ -743,6 +752,114 @@ mod test {
optimizer_options.hash_join_single_partition_threshold,
)
}
+ // Regression for https://github.com/apache/datafusion-ballista/issues/1681
+ //
+ // `JoinSelection` must not swap a `HashJoinExec(CollectLeft)` join's
+ // inputs in a way that puts a multi-partition reader on the build side.
+ // Ballista's pipeline has no `EnforceDistribution` pass after this rule,
+ // so the swapped plan reaches the executor unchanged and trips
+ // `HashJoinExec::execute`'s assertion that `left_partitions == 1` in
+ // `CollectLeft` mode.
+ #[tokio::test]
+ async fn collect_left_swap_preserves_one_partition_build() {
+ use datafusion::{
+ common::NullEquality,
+ config::ConfigOptions,
+ physical_expr::expressions::Column,
+ physical_optimizer::PhysicalOptimizerRule,
+ physical_plan::{
+ ExecutionPlanProperties,
coalesce_partitions::CoalescePartitionsExec,
+ joins::HashJoinExec, joins::PartitionMode,
+ },
+ };
+
+ use crate::physical_optimizer::join_selection::JoinSelection;
+
+ let schema = Schema::new(vec![Field::new("k", DataType::Int32,
false)]);
+
+ // Build side: 1 logical partition (e.g. a Ballista broadcast
+ // ShuffleReaderExec) but BIG total_byte_size.
+ let big_inner = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Precision::Exact(10_000),
+ total_byte_size: Precision::Exact(1_000_000),
+ column_statistics: vec![ColumnStatistics::new_unknown()],
+ },
+ schema.clone(),
+ ));
+ let left =
+ Arc::new(CoalescePartitionsExec::new(big_inner)) as Arc<dyn
ExecutionPlan>;
+ assert_eq!(left.output_partitioning().partition_count(), 1);
+
+ // Probe side: 2 partitions (the raw StatisticsExec default), SMALL
+ // total_byte_size. Stand-in for a multi-partition hash shuffle reader.
+ let right = Arc::new(StatisticsExec::new(
+ Statistics {
+ num_rows: Precision::Exact(3_000),
+ total_byte_size: Precision::Exact(200_000),
+ column_statistics: vec![ColumnStatistics::new_unknown()],
+ },
+ schema.clone(),
+ )) as Arc<dyn ExecutionPlan>;
+ assert!(right.output_partitioning().partition_count() > 1);
+
+ let on = vec![(
+ Arc::new(Column::new_with_schema("k", &left.schema()).unwrap()) as
_,
+ Arc::new(Column::new_with_schema("k", &right.schema()).unwrap())
as _,
+ )];
+
+ let join = Arc::new(
+ HashJoinExec::try_new(
+ Arc::clone(&left),
+ Arc::clone(&right),
+ on,
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::CollectLeft,
+ NullEquality::NullEqualsNothing,
+ false,
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+
+ let optimized = JoinSelection::new()
+ .optimize(join, &ConfigOptions::new())
+ .unwrap();
+
+ // `swap_inputs` for Inner wraps the join in a ProjectionExec to
+ // restore the output column order. Walk the tree to find the join.
+ fn find_hash_join(plan: &Arc<dyn ExecutionPlan>) ->
Option<&HashJoinExec> {
+ if let Some(hj) = plan.as_any().downcast_ref::<HashJoinExec>() {
+ return Some(hj);
+ }
+ for child in plan.children() {
+ if let Some(hj) = find_hash_join(child) {
+ return Some(hj);
+ }
+ }
+ None
+ }
+
+ let hj =
+ find_hash_join(&optimized).expect("HashJoinExec missing from
optimized plan");
+
+ assert_eq!(*hj.partition_mode(), PartitionMode::CollectLeft);
+ assert_eq!(
+ hj.left().output_partitioning().partition_count(),
+ 1,
+ "JoinSelection swapped a CollectLeft join's inputs and ended up \
+ with a multi-partition reader on the build side, which violates \
+ CollectLeft's invariant",
+ );
+ assert_eq!(
+ hj.right().output_partitioning().partition_count(),
+ 2,
+ "JoinSelection swapped a CollectLeft join's inputs and ended up \
+ with a multi-partition reader on the build side, which violates \
+ CollectLeft's invariant",
+ );
+ }
/// Create join filter for NLJoinExec with expression `big_col > small_col`
/// where both columns are 0-indexed and come from left and right inputs
respectively
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]