This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 4c6a1488 fix: Issue with `JoinSelection` and `CrossJoinExec` when
stages have been resoled (#1322)
4c6a1488 is described below
commit 4c6a1488d041f44b51bd4f1171ea6ba85e596c36
Author: Andrew <[email protected]>
AuthorDate: Fri Sep 19 23:39:31 2025 +0800
fix: Issue with `JoinSelection` and `CrossJoinExec` when stages have been
resoled (#1322)
* fix: Issue with JoinSelection and CrossJoinExec when stage is resolved
#1321
* fix clippy
---------
Co-authored-by: Marko Milenković <[email protected]>
---
.../src/physical_optimizer/join_selection.rs | 36 +++++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
diff --git a/ballista/scheduler/src/physical_optimizer/join_selection.rs
b/ballista/scheduler/src/physical_optimizer/join_selection.rs
index 2107e9dc..3fb4d60b 100644
--- a/ballista/scheduler/src/physical_optimizer/join_selection.rs
+++ b/ballista/scheduler/src/physical_optimizer/join_selection.rs
@@ -346,7 +346,9 @@ fn statistical_join_selection_subrule(
} else if let Some(cross_join) =
plan.as_any().downcast_ref::<CrossJoinExec>() {
let left = cross_join.left();
let right = cross_join.right();
- if should_swap_join_order(&**left, &**right)? {
+ if right.properties().output_partitioning().partition_count() > 1 {
+ None
+ } else if should_swap_join_order(&**left, &**right)? {
cross_join.swap_inputs().map(Some)?
} else {
None
@@ -670,6 +672,38 @@ mod test {
);
}
+ //
+ // join selection should not change order of joins for
+ // cross join as we're not able to insert new CoalescePartitions
+ // when stages are created
+ //
+ #[tokio::test]
+ async fn test_cross_join_with_swap() {
+ use std::sync::Arc;
+
+ use datafusion::{
+ config::ConfigOptions,
+ physical_optimizer::PhysicalOptimizerRule,
+ physical_plan::{displayable, joins::CrossJoinExec},
+ };
+
+ use crate::physical_optimizer::join_selection::JoinSelection;
+
+ let (big, small) = create_big_and_small();
+
+ let join = Arc::new(CrossJoinExec::new(Arc::clone(&big),
Arc::clone(&small)))
+ as Arc<dyn ExecutionPlan>;
+
+ let optimized_join = JoinSelection::new()
+ .optimize(join.clone(), &ConfigOptions::new())
+ .unwrap();
+
+ assert_eq!(
+ displayable(join.as_ref()).one_line().to_string(),
+ displayable(optimized_join.as_ref()).one_line().to_string()
+ );
+ }
+
fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn
ExecutionPlan>) {
let big = Arc::new(StatisticsExec::new(
big_statistics(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]