This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c6a1488 fix: Issue with `JoinSelection` and `CrossJoinExec` when 
stages have been resoled (#1322)
4c6a1488 is described below

commit 4c6a1488d041f44b51bd4f1171ea6ba85e596c36
Author: Andrew <[email protected]>
AuthorDate: Fri Sep 19 23:39:31 2025 +0800

    fix: Issue with `JoinSelection` and `CrossJoinExec` when stages have been 
resoled (#1322)
    
    * fix: Issue with JoinSelection and CrossJoinExec when stage is resolved 
#1321
    
    * fix clippy
    
    ---------
    
    Co-authored-by: Marko Milenković <[email protected]>
---
 .../src/physical_optimizer/join_selection.rs       | 36 +++++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)

diff --git a/ballista/scheduler/src/physical_optimizer/join_selection.rs 
b/ballista/scheduler/src/physical_optimizer/join_selection.rs
index 2107e9dc..3fb4d60b 100644
--- a/ballista/scheduler/src/physical_optimizer/join_selection.rs
+++ b/ballista/scheduler/src/physical_optimizer/join_selection.rs
@@ -346,7 +346,9 @@ fn statistical_join_selection_subrule(
         } else if let Some(cross_join) = 
plan.as_any().downcast_ref::<CrossJoinExec>() {
             let left = cross_join.left();
             let right = cross_join.right();
-            if should_swap_join_order(&**left, &**right)? {
+            if right.properties().output_partitioning().partition_count() > 1 {
+                None
+            } else if should_swap_join_order(&**left, &**right)? {
                 cross_join.swap_inputs().map(Some)?
             } else {
                 None
@@ -670,6 +672,38 @@ mod test {
         );
     }
 
+    //
+    // join selection should not change order of joins for
+    // cross join as we're not able to insert new CoalescePartitions
+    // when stages are created
+    //
+    #[tokio::test]
+    async fn test_cross_join_with_swap() {
+        use std::sync::Arc;
+
+        use datafusion::{
+            config::ConfigOptions,
+            physical_optimizer::PhysicalOptimizerRule,
+            physical_plan::{displayable, joins::CrossJoinExec},
+        };
+
+        use crate::physical_optimizer::join_selection::JoinSelection;
+
+        let (big, small) = create_big_and_small();
+
+        let join = Arc::new(CrossJoinExec::new(Arc::clone(&big), 
Arc::clone(&small)))
+            as Arc<dyn ExecutionPlan>;
+
+        let optimized_join = JoinSelection::new()
+            .optimize(join.clone(), &ConfigOptions::new())
+            .unwrap();
+
+        assert_eq!(
+            displayable(join.as_ref()).one_line().to_string(),
+            displayable(optimized_join.as_ref()).one_line().to_string()
+        );
+    }
+
     fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn 
ExecutionPlan>) {
         let big = Arc::new(StatisticsExec::new(
             big_statistics(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to