This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 99f2666a18 [Refactor] PipelineFixer physical optimizer rule removal 
(#7059)
99f2666a18 is described below

commit 99f2666a18d289ac64444fc813560762a90a0037
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Tue Jul 25 19:12:13 2023 +0300

    [Refactor] PipelineFixer physical optimizer rule removal (#7059)
    
    * Initial refactor
    
    * Minor
    
    * Update datafusion/core/src/physical_optimizer/join_selection.rs
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
    
    * Update according to comments
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../core/src/physical_optimizer/join_selection.rs  | 915 ++++++++++++++++++---
 datafusion/core/src/physical_optimizer/mod.rs      |   1 -
 .../core/src/physical_optimizer/optimizer.rs       |   7 -
 .../core/src/physical_optimizer/pipeline_fixer.rs  | 716 ----------------
 datafusion/core/tests/memory_limit.rs              |   4 +-
 .../tests/sqllogictests/test_files/explain.slt     |   1 -
 6 files changed, 802 insertions(+), 842 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index a9dec73c36..9ecfb8993f 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -15,36 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Select the proper PartitionMode and build side based on the avaliable 
statistics for hash join.
-use std::sync::Arc;
+//! The [`JoinSelection`] rule tries to modify a given plan so that it can
+//! accommodate infinite sources and utilize statistical information (if there
+//! is any) to obtain more performant plans. To achieve the first goal, it
+//! tries to transform a non-runnable query (with the given infinite sources)
+//! into a runnable query by replacing pipeline-breaking join operations with
+//! pipeline-friendly ones. To achieve the second goal, it selects the proper
+//! `PartitionMode` and the build side using the available statistics for hash 
joins.
 
-use arrow::datatypes::Schema;
+use std::sync::Arc;
 
 use crate::config::ConfigOptions;
-use crate::logical_expr::JoinType;
-use crate::physical_plan::expressions::Column;
+use crate::error::Result;
+use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
 use crate::physical_plan::joins::{
-    utils::{ColumnIndex, JoinFilter, JoinSide},
-    CrossJoinExec, HashJoinExec, PartitionMode,
+    CrossJoinExec, HashJoinExec, PartitionMode, StreamJoinPartitionMode,
+    SymmetricHashJoinExec,
 };
 use crate::physical_plan::projection::ProjectionExec;
-use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+use crate::physical_plan::ExecutionPlan;
 
-use super::optimizer::PhysicalOptimizerRule;
-use crate::error::Result;
+use arrow_schema::Schema;
 use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{DataFusionError, JoinType};
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::PhysicalExpr;
 
-/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection 
rule will make
-/// a cost based decision to select which PartitionMode 
mode(Partitioned/CollectLeft) is optimal
-/// based on the available statistics that the inputs have.
-/// If the statistics information is not available, the partition mode will 
fall back to [PartitionMode::Partitioned].
-///
-/// JoinSelection rule will also reorder the build and probe phase of the hash 
joins
-/// based on the avaliable statistics that the inputs have.
-/// The rule optimizes the order such that the left (build) side of the join 
is the smallest.
-/// If the statistics information is not available, the order stays the same 
as the original query.
-/// JoinSelection rule will also swap the left and right sides for cross join 
to keep the left side
-/// is the smallest.
+/// The [`JoinSelection`] rule tries to modify a given plan so that it can
+/// accommodate infinite sources and optimize joins in the plan according to
+/// available statistical information, if there is any.
 #[derive(Default)]
 pub struct JoinSelection {}
 
@@ -55,8 +56,9 @@ impl JoinSelection {
     }
 }
 
-// TODO we need some performance test for Right Semi/Right Join swap to Left 
Semi/Left Join in case that the right side is smaller but not much smaller.
-// TODO In PrestoSQL, the optimizer flips join sides only if one side is much 
smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default 
is is 8 times.
+// TODO: We need some performance test for Right Semi/Right Join swap to Left 
Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much 
smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default 
is is 8 times.
+/// Checks statistics for join swap.
 fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) 
-> bool {
     // Get the left and right table's total bytes
     // If both the left and right tables contain total_byte_size statistics,
@@ -89,8 +91,9 @@ fn supports_collect_by_size(
         false
     }
 }
+
 /// Predicate that checks whether the given join type supports input swapping.
-pub fn supports_swap(join_type: JoinType) -> bool {
+fn supports_swap(join_type: JoinType) -> bool {
     matches!(
         join_type,
         JoinType::Inner
@@ -103,9 +106,10 @@ pub fn supports_swap(join_type: JoinType) -> bool {
             | JoinType::RightAnti
     )
 }
+
 /// This function returns the new join type we get after swapping the given
 /// join's inputs.
-pub fn swap_join_type(join_type: JoinType) -> JoinType {
+fn swap_join_type(join_type: JoinType) -> JoinType {
     match join_type {
         JoinType::Inner => JoinType::Inner,
         JoinType::Full => JoinType::Full,
@@ -119,7 +123,7 @@ pub fn swap_join_type(join_type: JoinType) -> JoinType {
 }
 
 /// This function swaps the inputs of the given join operator.
-pub fn swap_hash_join(
+fn swap_hash_join(
     hash_join: &HashJoinExec,
     partition_mode: PartitionMode,
 ) -> Result<Arc<dyn ExecutionPlan>> {
@@ -160,7 +164,7 @@ pub fn swap_hash_join(
 /// the output should not be impacted. This function creates the expressions
 /// that will allow to swap back the values from the original left as the first
 /// columns and those on the right next.
-pub fn swap_reverting_projection(
+fn swap_reverting_projection(
     left_schema: &Schema,
     right_schema: &Schema,
 ) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
@@ -182,30 +186,26 @@ pub fn swap_reverting_projection(
 }
 
 /// Swaps join sides for filter column indices and produces new JoinFilter
-fn swap_join_filter(filter: Option<&JoinFilter>) -> Option<JoinFilter> {
-    filter.map(|filter| {
-        let column_indices = filter
-            .column_indices()
-            .iter()
-            .map(|idx| {
-                let side = if matches!(idx.side, JoinSide::Left) {
-                    JoinSide::Right
-                } else {
-                    JoinSide::Left
-                };
-                ColumnIndex {
-                    index: idx.index,
-                    side,
-                }
-            })
-            .collect();
+fn swap_filter(filter: &JoinFilter) -> JoinFilter {
+    let column_indices = filter
+        .column_indices()
+        .iter()
+        .map(|idx| ColumnIndex {
+            index: idx.index,
+            side: idx.side.negate(),
+        })
+        .collect();
 
-        JoinFilter::new(
-            filter.expression().clone(),
-            column_indices,
-            filter.schema().clone(),
-        )
-    })
+    JoinFilter::new(
+        filter.expression().clone(),
+        column_indices,
+        filter.schema().clone(),
+    )
+}
+
+/// Swaps join sides for filter column indices and produces new `JoinFilter` 
(if exists).
+fn swap_join_filter(filter: Option<&JoinFilter>) -> Option<JoinFilter> {
+    filter.map(swap_filter)
 }
 
 impl PhysicalOptimizerRule for JoinSelection {
@@ -214,63 +214,32 @@ impl PhysicalOptimizerRule for JoinSelection {
         plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let pipeline = PipelineStatePropagator::new(plan);
+        // First, we make pipeline-fixing modifications to joins so as to 
accommodate
+        // unbounded inputs. Each pipeline-fixing subrule, which is a function
+        // of type `PipelineFixerSubrule`, takes a single 
[`PipelineStatePropagator`]
+        // argument storing state variables that indicate the unboundedness 
status
+        // of the current [`ExecutionPlan`] as we traverse the plan tree.
+        let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
+            Box::new(hash_join_convert_symmetric_subrule),
+            Box::new(hash_join_swap_subrule),
+        ];
+        let state = pipeline.transform_up(&|p| apply_subrules(p, &subrules, 
config))?;
+        // Next, we apply another subrule that tries to optimize joins using 
any
+        // statistics their inputs might have.
+        // - For a hash join with partition mode [`PartitionMode::Auto`], we 
will
+        //   make a cost-based decision to select which `PartitionMode` mode
+        //   (`Partitioned`/`CollectLeft`) is optimal. If the statistics 
information
+        //   is not available, we will fall back to 
[`PartitionMode::Partitioned`].
+        // - We optimize/swap join sides so that the left (build) side of the 
join
+        //   is the small side. If the statistics information is not 
available, we
+        //   do not modify join sides.
+        // - We will also swap left and right sides for cross joins so that 
the left
+        //   side is the small side.
         let config = &config.optimizer;
         let collect_left_threshold = 
config.hash_join_single_partition_threshold;
-        plan.transform_up(&|plan| {
-            let transformed = if let Some(hash_join) =
-                plan.as_any().downcast_ref::<HashJoinExec>()
-            {
-                match hash_join.partition_mode() {
-                    PartitionMode::Auto => {
-                        try_collect_left(hash_join, 
Some(collect_left_threshold))?
-                            .map_or_else(
-                                || partitioned_hash_join(hash_join).map(Some),
-                                |v| Ok(Some(v)),
-                            )?
-                    }
-                    PartitionMode::CollectLeft => try_collect_left(hash_join, 
None)?
-                        .map_or_else(
-                            || partitioned_hash_join(hash_join).map(Some),
-                            |v| Ok(Some(v)),
-                        )?,
-                    PartitionMode::Partitioned => {
-                        let left = hash_join.left();
-                        let right = hash_join.right();
-                        if should_swap_join_order(&**left, &**right)
-                            && supports_swap(*hash_join.join_type())
-                        {
-                            swap_hash_join(hash_join, 
PartitionMode::Partitioned)
-                                .map(Some)?
-                        } else {
-                            None
-                        }
-                    }
-                }
-            } else if let Some(cross_join) = 
plan.as_any().downcast_ref::<CrossJoinExec>()
-            {
-                let left = cross_join.left();
-                let right = cross_join.right();
-                if should_swap_join_order(&**left, &**right) {
-                    let new_join =
-                        CrossJoinExec::new(Arc::clone(right), 
Arc::clone(left));
-                    // TODO avoid adding ProjectionExec again and again, only 
adding Final Projection
-                    let proj: Arc<dyn ExecutionPlan> = 
Arc::new(ProjectionExec::try_new(
-                        swap_reverting_projection(&left.schema(), 
&right.schema()),
-                        Arc::new(new_join),
-                    )?);
-                    Some(proj)
-                } else {
-                    None
-                }
-            } else {
-                None
-            };
-
-            Ok(if let Some(transformed) = transformed {
-                Transformed::Yes(transformed)
-            } else {
-                Transformed::No(plan)
-            })
+        state.plan.transform_up(&|plan| {
+            statistical_join_selection_subrule(plan, collect_left_threshold)
         })
     }
 
@@ -283,13 +252,17 @@ impl PhysicalOptimizerRule for JoinSelection {
     }
 }
 
-/// Try to create the PartitionMode::CollectLeft HashJoinExec when possible.
-/// The method will first consider the current join type and check whether it 
is applicable to run CollectLeft mode
-/// and will try to swap the join if the orignal type is unapplicable to run 
CollectLeft.
-/// When the collect_threshold is provided, the method will also check both 
the left side and right side sizes
+/// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when 
possible.
 ///
-/// For [JoinType::Full], it is alway unable to run CollectLeft mode and will 
return None.
-/// For [JoinType::Left] and [JoinType::LeftAnti], can not run CollectLeft 
mode, should swap join type to [JoinType::Right] and [JoinType::RightAnti]
+/// This function will first consider the given join type and check whether the
+/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join 
sides.
+/// When the `collect_threshold` is provided, this function will also check 
left
+/// and right sizes.
+///
+/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return 
`None`.
+/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run 
`CollectLeft`
+/// mode as is, but it can do so by changing the join type to 
[`JoinType::Right`]
+/// and [`JoinType::RightAnti`], respectively.
 fn try_collect_left(
     hash_join: &HashJoinExec,
     collect_threshold: Option<usize>,
@@ -375,8 +348,238 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> 
Result<Arc<dyn ExecutionPl
     }
 }
 
+/// This subrule tries to modify a given plan so that it can
+/// optimize hash and cross joins in the plan according to available 
statistical information.
+fn statistical_join_selection_subrule(
+    plan: Arc<dyn ExecutionPlan>,
+    collect_left_threshold: usize,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    let transformed = if let Some(hash_join) =
+        plan.as_any().downcast_ref::<HashJoinExec>()
+    {
+        match hash_join.partition_mode() {
+            PartitionMode::Auto => {
+                try_collect_left(hash_join, 
Some(collect_left_threshold))?.map_or_else(
+                    || partitioned_hash_join(hash_join).map(Some),
+                    |v| Ok(Some(v)),
+                )?
+            }
+            PartitionMode::CollectLeft => try_collect_left(hash_join, None)?
+                .map_or_else(
+                    || partitioned_hash_join(hash_join).map(Some),
+                    |v| Ok(Some(v)),
+                )?,
+            PartitionMode::Partitioned => {
+                let left = hash_join.left();
+                let right = hash_join.right();
+                if should_swap_join_order(&**left, &**right)
+                    && supports_swap(*hash_join.join_type())
+                {
+                    swap_hash_join(hash_join, 
PartitionMode::Partitioned).map(Some)?
+                } else {
+                    None
+                }
+            }
+        }
+    } else if let Some(cross_join) = 
plan.as_any().downcast_ref::<CrossJoinExec>() {
+        let left = cross_join.left();
+        let right = cross_join.right();
+        if should_swap_join_order(&**left, &**right) {
+            let new_join = CrossJoinExec::new(Arc::clone(right), 
Arc::clone(left));
+            // TODO avoid adding ProjectionExec again and again, only adding 
Final Projection
+            let proj: Arc<dyn ExecutionPlan> = 
Arc::new(ProjectionExec::try_new(
+                swap_reverting_projection(&left.schema(), &right.schema()),
+                Arc::new(new_join),
+            )?);
+            Some(proj)
+        } else {
+            None
+        }
+    } else {
+        None
+    };
+
+    Ok(if let Some(transformed) = transformed {
+        Transformed::Yes(transformed)
+    } else {
+        Transformed::No(plan)
+    })
+}
+
+/// Pipeline-fixing join selection subrule.
+pub type PipelineFixerSubrule = dyn Fn(
+    PipelineStatePropagator,
+    &ConfigOptions,
+) -> Option<Result<PipelineStatePropagator>>;
+
+/// This subrule checks if we can replace a hash join with a symmetric hash
+/// join when we are dealing with infinite inputs on both sides. This change
+/// avoids pipeline breaking and preserves query runnability. If possible,
+/// this subrule makes this replacement; otherwise, it has no effect.
+fn hash_join_convert_symmetric_subrule(
+    mut input: PipelineStatePropagator,
+    config_options: &ConfigOptions,
+) -> Option<Result<PipelineStatePropagator>> {
+    if let Some(hash_join) = 
input.plan.as_any().downcast_ref::<HashJoinExec>() {
+        let ub_flags = &input.children_unbounded;
+        let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
+        input.unbounded = left_unbounded || right_unbounded;
+        let result = if left_unbounded && right_unbounded {
+            let mode = if config_options.optimizer.repartition_joins {
+                StreamJoinPartitionMode::Partitioned
+            } else {
+                StreamJoinPartitionMode::SinglePartition
+            };
+            SymmetricHashJoinExec::try_new(
+                hash_join.left().clone(),
+                hash_join.right().clone(),
+                hash_join.on().to_vec(),
+                hash_join.filter().cloned(),
+                hash_join.join_type(),
+                hash_join.null_equals_null(),
+                mode,
+            )
+            .map(|exec| {
+                input.plan = Arc::new(exec) as _;
+                input
+            })
+        } else {
+            Ok(input)
+        };
+        Some(result)
+    } else {
+        None
+    }
+}
+
+/// This subrule will swap build/probe sides of a hash join depending on 
whether
+/// one of its inputs may produce an infinite stream of records. The rule 
ensures
+/// that the left (build) side of the hash join always operates on an input 
stream
+/// that will produce a finite set of records. If the left side can not be 
chosen
+/// to be "finite", the join sides stay the same as the original query.
+/// ```text
+/// For example, this rule makes the following transformation:
+///
+///
+///
+///           +--------------+              +--------------+
+///           |              |  unbounded   |              |
+///    Left   | Infinite     |    true      | Hash         |\true
+///           | Data source  |--------------| Repartition  | \   
+--------------+       +--------------+
+///           |              |              |              |  \  |             
 |       |              |
+///           +--------------+              +--------------+   - |  Hash Join  
 |-------| Projection   |
+///                                                            - |             
 |       |              |
+///           +--------------+              +--------------+  /  
+--------------+       +--------------+
+///           |              |  unbounded   |              | /
+///    Right  | Finite       |    false     | Hash         |/false
+///           | Data Source  |--------------| Repartition  |
+///           |              |              |              |
+///           +--------------+              +--------------+
+///
+///
+///
+///           +--------------+              +--------------+
+///           |              |  unbounded   |              |
+///    Left   | Finite       |    false     | Hash         |\false
+///           | Data source  |--------------| Repartition  | \   
+--------------+       +--------------+
+///           |              |              |              |  \  |             
 | true  |              | true
+///           +--------------+              +--------------+   - |  Hash Join  
 |-------| Projection   |-----
+///                                                            - |             
 |       |              |
+///           +--------------+              +--------------+  /  
+--------------+       +--------------+
+///           |              |  unbounded   |              | /
+///    Right  | Infinite     |    true      | Hash         |/true
+///           | Data Source  |--------------| Repartition  |
+///           |              |              |              |
+///           +--------------+              +--------------+
+///
+/// ```
+fn hash_join_swap_subrule(
+    mut input: PipelineStatePropagator,
+    _config_options: &ConfigOptions,
+) -> Option<Result<PipelineStatePropagator>> {
+    if let Some(hash_join) = 
input.plan.as_any().downcast_ref::<HashJoinExec>() {
+        let ub_flags = &input.children_unbounded;
+        let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
+        input.unbounded = left_unbounded || right_unbounded;
+        let result = if left_unbounded
+            && !right_unbounded
+            && matches!(
+                *hash_join.join_type(),
+                JoinType::Inner
+                    | JoinType::Left
+                    | JoinType::LeftSemi
+                    | JoinType::LeftAnti
+            ) {
+            swap_join_according_to_unboundedness(hash_join).map(|plan| {
+                input.plan = plan;
+                input
+            })
+        } else {
+            Ok(input)
+        };
+        Some(result)
+    } else {
+        None
+    }
+}
+
+/// This function swaps sides of a hash join to make it runnable even if one of
+/// its inputs are infinite. Note that this is not always possible; i.e.
+/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
+/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
+/// we swap join sides. Therefore, we do not consider them here.
+fn swap_join_according_to_unboundedness(
+    hash_join: &HashJoinExec,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let partition_mode = hash_join.partition_mode();
+    let join_type = hash_join.join_type();
+    match (*partition_mode, *join_type) {
+        (
+            _,
+            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | 
JoinType::Full,
+        ) => Err(DataFusionError::Internal(format!(
+            "{join_type} join cannot be swapped for unbounded input."
+        ))),
+        (PartitionMode::Partitioned, _) => {
+            swap_hash_join(hash_join, PartitionMode::Partitioned)
+        }
+        (PartitionMode::CollectLeft, _) => {
+            swap_hash_join(hash_join, PartitionMode::CollectLeft)
+        }
+        (PartitionMode::Auto, _) => Err(DataFusionError::Internal(
+            "Auto is not acceptable for unbounded input here.".to_string(),
+        )),
+    }
+}
+
+/// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with
+/// auxiliary boundedness information, is in the `PipelineStatePropagator` 
object.
+fn apply_subrules(
+    mut input: PipelineStatePropagator,
+    subrules: &Vec<Box<PipelineFixerSubrule>>,
+    config_options: &ConfigOptions,
+) -> Result<Transformed<PipelineStatePropagator>> {
+    for subrule in subrules {
+        if let Some(value) = subrule(input.clone(), 
config_options).transpose()? {
+            input = value;
+        }
+    }
+    let is_unbounded = input
+        .plan
+        .unbounded_output(&input.children_unbounded)
+        // Treat the case where an operator can not run on unbounded data as
+        // if it can and it outputs unbounded data. Do not raise an error yet.
+        // Such operators may be fixed, adjusted or replaced later on during
+        // optimization passes -- sorts may be removed, windows may be adjusted
+        // etc. If this doesn't happen, the final `PipelineChecker` rule will
+        // catch this and raise an error anyway.
+        .unwrap_or(true);
+    input.unbounded = is_unbounded;
+    Ok(Transformed::Yes(input))
+}
+
 #[cfg(test)]
-mod tests {
+mod tests_statistical {
     use crate::{
         physical_plan::{
             displayable, joins::PartitionMode, ColumnStatistics, Statistics,
@@ -388,7 +591,9 @@ mod tests {
     use std::sync::Arc;
 
     use arrow::datatypes::{DataType, Field, Schema};
-    use datafusion_common::ScalarValue;
+    use datafusion_common::{JoinType, ScalarValue};
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr::PhysicalExpr;
 
     fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn 
ExecutionPlan>) {
         let big = Arc::new(StatisticsExec::new(
@@ -556,7 +761,6 @@ mod tests {
             .expect("A proj is required to swap columns back to their original 
order");
 
         assert_eq!(swapping_projection.expr().len(), 2);
-        println!("swapping_projection {swapping_projection:?}");
         let (col, name) = &swapping_projection.expr()[0];
         assert_eq!(name, "small_col");
         assert_col_expr(col, "small_col", 1);
@@ -693,7 +897,7 @@ mod tests {
             "        StatisticsExec: col_count=1, row_count=Some(1000)",
             "        StatisticsExec: col_count=1, row_count=Some(100000)",
             "    StatisticsExec: col_count=1, row_count=Some(10000)",
-            ""
+            "",
         ];
         assert_optimized!(expected, join);
     }
@@ -967,3 +1171,484 @@ mod tests {
         }
     }
 }
+
+#[cfg(test)]
+mod util_tests {
+    use datafusion_expr::Operator;
+    use datafusion_physical_expr::expressions::{BinaryExpr, Column, 
NegativeExpr};
+    use datafusion_physical_expr::intervals::check_support;
+    use datafusion_physical_expr::PhysicalExpr;
+    use std::sync::Arc;
+
+    #[test]
+    fn check_expr_supported() {
+        let supported_expr = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Plus,
+            Arc::new(Column::new("a", 0)),
+        )) as Arc<dyn PhysicalExpr>;
+        assert!(check_support(&supported_expr));
+        let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn 
PhysicalExpr>;
+        assert!(check_support(&supported_expr_2));
+        let unsupported_expr = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Or,
+            Arc::new(Column::new("a", 0)),
+        )) as Arc<dyn PhysicalExpr>;
+        assert!(!check_support(&unsupported_expr));
+        let unsupported_expr_2 = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Or,
+            Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
+        )) as Arc<dyn PhysicalExpr>;
+        assert!(!check_support(&unsupported_expr_2));
+    }
+}
+
+#[cfg(test)]
+mod hash_join_tests {
+    use super::*;
+    use crate::physical_optimizer::join_selection::swap_join_type;
+    use crate::physical_optimizer::test_utils::SourceType;
+    use crate::physical_plan::expressions::Column;
+    use crate::physical_plan::joins::PartitionMode;
+    use crate::physical_plan::projection::ProjectionExec;
+    use crate::test_util::UnboundedExec;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::record_batch::RecordBatch;
+    use datafusion_common::utils::DataPtr;
+    use datafusion_common::JoinType;
+    use std::sync::Arc;
+
+    struct TestCase {
+        case: String,
+        initial_sources_unbounded: (SourceType, SourceType),
+        initial_join_type: JoinType,
+        initial_mode: PartitionMode,
+        expected_sources_unbounded: (SourceType, SourceType),
+        expected_join_type: JoinType,
+        expected_mode: PartitionMode,
+        expecting_swap: bool,
+    }
+
+    #[tokio::test]
+    async fn test_join_with_swap_full() -> Result<()> {
+        // NOTE: Currently, some initial conditions are not viable after join 
order selection.
+        //       For example, full join always comes in partitioned mode. See 
the warning in
+        //       function "swap". If this changes in the future, we should 
update these tests.
+        let cases = vec![
+            TestCase {
+                case: "Bounded - Unbounded 1".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+            TestCase {
+                case: "Unbounded - Bounded 2".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+            TestCase {
+                case: "Bounded - Bounded 3".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+            TestCase {
+                case: "Unbounded - Unbounded 4".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+        ];
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_cases_without_collect_left_check() -> Result<()> {
+        let mut cases = vec![];
+        let join_types = vec![JoinType::LeftSemi, JoinType::Inner];
+        for join_type in join_types {
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: swap_join_type(join_type),
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: true,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: swap_join_type(join_type),
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: true,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+        }
+
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_not_support_collect_left() -> Result<()> {
+        let mut cases = vec![];
+        // After [JoinSelection] optimization, these join types cannot run in 
CollectLeft mode except
+        // [JoinType::LeftSemi]
+        let the_ones_not_support_collect_left = vec![JoinType::Left, 
JoinType::LeftAnti];
+        for join_type in the_ones_not_support_collect_left {
+            cases.push(TestCase {
+                case: "Unbounded - Bounded".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: swap_join_type(join_type),
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: true,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+        }
+
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> {
+        let mut cases = vec![];
+        let the_ones_not_support_collect_left =
+            vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi];
+        for join_type in the_ones_not_support_collect_left {
+            // We expect that (SourceType::Unbounded, SourceType::Bounded) 
will change, regardless of the
+            // statistics.
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            // We expect that (SourceType::Bounded, SourceType::Unbounded) 
will stay same, regardless of the
+            // statistics.
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            //
+            cases.push(TestCase {
+                case: "Bounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            // If cases are partitioned, only unbounded & bounded check will 
affect the order.
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+        }
+
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+
+    async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> 
Result<()> {
+        let left_unbounded = t.initial_sources_unbounded.0 == 
SourceType::Unbounded;
+        let right_unbounded = t.initial_sources_unbounded.1 == 
SourceType::Unbounded;
+        let left_exec = Arc::new(UnboundedExec::new(
+            (!left_unbounded).then_some(1),
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+                "a",
+                DataType::Int32,
+                false,
+            )]))),
+            2,
+        )) as Arc<dyn ExecutionPlan>;
+        let right_exec = Arc::new(UnboundedExec::new(
+            (!right_unbounded).then_some(1),
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+                "b",
+                DataType::Int32,
+                false,
+            )]))),
+            2,
+        )) as Arc<dyn ExecutionPlan>;
+
+        let join = HashJoinExec::try_new(
+            Arc::clone(&left_exec),
+            Arc::clone(&right_exec),
+            vec![(
+                Column::new_with_schema("a", &left_exec.schema())?,
+                Column::new_with_schema("b", &right_exec.schema())?,
+            )],
+            None,
+            &t.initial_join_type,
+            t.initial_mode,
+            false,
+        )?;
+
+        let initial_hash_join_state = PipelineStatePropagator {
+            plan: Arc::new(join),
+            unbounded: false,
+            children_unbounded: vec![left_unbounded, right_unbounded],
+        };
+
+        let optimized_hash_join =
+            hash_join_swap_subrule(initial_hash_join_state, 
&ConfigOptions::new())
+                .unwrap()?;
+        let optimized_join_plan = optimized_hash_join.plan;
+
+        // If swap did happen
+        let projection_added = 
optimized_join_plan.as_any().is::<ProjectionExec>();
+        let plan = if projection_added {
+            let proj = optimized_join_plan
+                .as_any()
+                .downcast_ref::<ProjectionExec>()
+                .expect(
+                    "A proj is required to swap columns back to their original 
order",
+                );
+            proj.input().clone()
+        } else {
+            optimized_join_plan
+        };
+
+        if let Some(HashJoinExec {
+            left,
+            right,
+            join_type,
+            mode,
+            ..
+        }) = plan.as_any().downcast_ref::<HashJoinExec>()
+        {
+            let left_changed = Arc::data_ptr_eq(left, &right_exec);
+            let right_changed = Arc::data_ptr_eq(right, &left_exec);
+            // If this is not equal, we have a bigger problem.
+            assert_eq!(left_changed, right_changed);
+            assert_eq!(
+                (
+                    t.case.as_str(),
+                    if left.unbounded_output(&[])? {
+                        SourceType::Unbounded
+                    } else {
+                        SourceType::Bounded
+                    },
+                    if right.unbounded_output(&[])? {
+                        SourceType::Unbounded
+                    } else {
+                        SourceType::Bounded
+                    },
+                    join_type,
+                    mode,
+                    left_changed && right_changed
+                ),
+                (
+                    t.case.as_str(),
+                    t.expected_sources_unbounded.0,
+                    t.expected_sources_unbounded.1,
+                    &t.expected_join_type,
+                    &t.expected_mode,
+                    t.expecting_swap
+                )
+            );
+        };
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index 8ee95ea663..e1a5089879 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -35,7 +35,6 @@ pub mod sort_enforcement;
 mod sort_pushdown;
 mod utils;
 
-pub mod pipeline_fixer;
 #[cfg(test)]
 pub mod test_utils;
 
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs 
b/datafusion/core/src/physical_optimizer/optimizer.rs
index d35c82abd2..3f6698c6cf 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -26,7 +26,6 @@ use 
crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAgg
 use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
 use crate::physical_optimizer::join_selection::JoinSelection;
 use crate::physical_optimizer::pipeline_checker::PipelineChecker;
-use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
 use crate::physical_optimizer::repartition::Repartition;
 use crate::physical_optimizer::sort_enforcement::EnforceSorting;
 use crate::{error::Result, physical_plan::ExecutionPlan};
@@ -76,12 +75,6 @@ impl PhysicalOptimizer {
             // repartitioning and local sorting steps to meet distribution and 
ordering requirements.
             // Therefore, it should run before EnforceDistribution and 
EnforceSorting.
             Arc::new(JoinSelection::new()),
-            // If the query is processing infinite inputs, the PipelineFixer 
rule applies the
-            // necessary transformations to make the query runnable (if it is 
not already runnable).
-            // If the query can not be made runnable, the rule emits an error 
with a diagnostic message.
-            // Since the transformations it applies may alter output 
partitioning properties of operators
-            // (e.g. by swapping hash join sides), this rule runs before 
EnforceDistribution.
-            Arc::new(PipelineFixer::new()),
             // In order to increase the parallelism, the Repartition rule will 
change the
             // output partitioning of some operators in the plan tree, which 
will influence
             // other rules. Therefore, it should run as soon as possible. It 
is optional because:
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs 
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
deleted file mode 100644
index 7db3e99c39..0000000000
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ /dev/null
@@ -1,716 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! The [PipelineFixer] rule tries to modify a given plan so that it can
-//! accommodate its infinite sources, if there are any. In other words,
-//! it tries to obtain a runnable query (with the given infinite sources)
-//! from an non-runnable query by transforming pipeline-breaking operations
-//! to pipeline-friendly ones. If this can not be done, the rule emits a
-//! diagnostic error message.
-//!
-use crate::config::ConfigOptions;
-use crate::error::Result;
-use crate::physical_optimizer::join_selection::swap_hash_join;
-use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
-use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::joins::{
-    HashJoinExec, PartitionMode, StreamJoinPartitionMode, 
SymmetricHashJoinExec,
-};
-use crate::physical_plan::ExecutionPlan;
-use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::DataFusionError;
-use datafusion_expr::logical_plan::JoinType;
-
-use std::sync::Arc;
-
-/// The [`PipelineFixer`] rule tries to modify a given plan so that it can
-/// accommodate its infinite sources, if there are any. If this is not
-/// possible, the rule emits a diagnostic error message.
-#[derive(Default)]
-pub struct PipelineFixer {}
-
-impl PipelineFixer {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-/// [`PipelineFixer`] subrules are functions of this type. Such functions take 
a
-/// single [`PipelineStatePropagator`] argument, which stores state variables
-/// indicating the unboundedness status of the current [`ExecutionPlan`] as
-/// the `PipelineFixer` rule traverses the entire plan tree.
-type PipelineFixerSubrule =
-    dyn Fn(PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
-
-impl PhysicalOptimizerRule for PipelineFixer {
-    fn optimize(
-        &self,
-        plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let pipeline = PipelineStatePropagator::new(plan);
-        let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
-            Box::new(hash_join_convert_symmetric_subrule),
-            Box::new(hash_join_swap_subrule),
-        ];
-        let state = pipeline.transform_up(&|p| apply_subrules(p, &subrules))?;
-        Ok(state.plan)
-    }
-
-    fn name(&self) -> &str {
-        "PipelineFixer"
-    }
-
-    fn schema_check(&self) -> bool {
-        true
-    }
-}
-
-/// This subrule checks if one can replace a hash join with a symmetric hash
-/// join so that the pipeline does not break due to the join operation in
-/// question. If possible, it makes this replacement; otherwise, it has no
-/// effect.
-fn hash_join_convert_symmetric_subrule(
-    mut input: PipelineStatePropagator,
-) -> Option<Result<PipelineStatePropagator>> {
-    if let Some(hash_join) = 
input.plan.as_any().downcast_ref::<HashJoinExec>() {
-        let ub_flags = &input.children_unbounded;
-        let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
-        input.unbounded = left_unbounded || right_unbounded;
-        let result = if left_unbounded && right_unbounded {
-            SymmetricHashJoinExec::try_new(
-                hash_join.left().clone(),
-                hash_join.right().clone(),
-                hash_join
-                    .on()
-                    .iter()
-                    .map(|(l, r)| (l.clone(), r.clone()))
-                    .collect(),
-                hash_join.filter().cloned(),
-                hash_join.join_type(),
-                hash_join.null_equals_null(),
-                StreamJoinPartitionMode::Partitioned,
-            )
-            .map(|exec| {
-                input.plan = Arc::new(exec) as _;
-                input
-            })
-        } else {
-            Ok(input)
-        };
-        Some(result)
-    } else {
-        None
-    }
-}
-
-/// This subrule will swap build/probe sides of a hash join depending on 
whether its inputs
-/// may produce an infinite stream of records. The rule ensures that the left 
(build) side
-/// of the hash join always operates on an input stream that will produce a 
finite set of.
-/// records If the left side can not be chosen to be "finite", the order stays 
the
-/// same as the original query.
-/// ```text
-/// For example, this rule makes the following transformation:
-///
-///
-///
-///           +--------------+              +--------------+
-///           |              |  unbounded   |              |
-///    Left   | Infinite     |    true      | Hash         |\true
-///           | Data source  |--------------| Repartition  | \   
+--------------+       +--------------+
-///           |              |              |              |  \  |             
 |       |              |
-///           +--------------+              +--------------+   - |  Hash Join  
 |-------| Projection   |
-///                                                            - |             
 |       |              |
-///           +--------------+              +--------------+  /  
+--------------+       +--------------+
-///           |              |  unbounded   |              | /
-///    Right  | Finite       |    false     | Hash         |/false
-///           | Data Source  |--------------| Repartition  |
-///           |              |              |              |
-///           +--------------+              +--------------+
-///
-///
-///
-///           +--------------+              +--------------+
-///           |              |  unbounded   |              |
-///    Left   | Finite       |    false     | Hash         |\false
-///           | Data source  |--------------| Repartition  | \   
+--------------+       +--------------+
-///           |              |              |              |  \  |             
 | true  |              | true
-///           +--------------+              +--------------+   - |  Hash Join  
 |-------| Projection   |-----
-///                                                            - |             
 |       |              |
-///           +--------------+              +--------------+  /  
+--------------+       +--------------+
-///           |              |  unbounded   |              | /
-///    Right  | Infinite     |    true      | Hash         |/true
-///           | Data Source  |--------------| Repartition  |
-///           |              |              |              |
-///           +--------------+              +--------------+
-///
-/// ```
-fn hash_join_swap_subrule(
-    mut input: PipelineStatePropagator,
-) -> Option<Result<PipelineStatePropagator>> {
-    if let Some(hash_join) = 
input.plan.as_any().downcast_ref::<HashJoinExec>() {
-        let ub_flags = &input.children_unbounded;
-        let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
-        input.unbounded = left_unbounded || right_unbounded;
-        let result = if left_unbounded
-            && !right_unbounded
-            && matches!(
-                *hash_join.join_type(),
-                JoinType::Inner
-                    | JoinType::Left
-                    | JoinType::LeftSemi
-                    | JoinType::LeftAnti
-            ) {
-            swap(hash_join).map(|plan| {
-                input.plan = plan;
-                input
-            })
-        } else {
-            Ok(input)
-        };
-        Some(result)
-    } else {
-        None
-    }
-}
-
-/// This function swaps sides of a hash join to make it runnable even if one 
of its
-/// inputs are infinite. Note that this is not always possible; i.e. 
[JoinType::Full],
-/// [JoinType::Right], [JoinType::RightAnti] and [JoinType::RightSemi] can not 
run with
-/// an unbounded left side, even if we swap. Therefore, we do not consider 
them here.
-fn swap(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
-    let partition_mode = hash_join.partition_mode();
-    let join_type = hash_join.join_type();
-    match (*partition_mode, *join_type) {
-        (
-            _,
-            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | 
JoinType::Full,
-        ) => Err(DataFusionError::Internal(format!(
-            "{join_type} join cannot be swapped for unbounded input."
-        ))),
-        (PartitionMode::Partitioned, _) => {
-            swap_hash_join(hash_join, PartitionMode::Partitioned)
-        }
-        (PartitionMode::CollectLeft, _) => {
-            swap_hash_join(hash_join, PartitionMode::CollectLeft)
-        }
-        (PartitionMode::Auto, _) => Err(DataFusionError::Internal(
-            "Auto is not acceptable for unbounded input here.".to_string(),
-        )),
-    }
-}
-
-fn apply_subrules(
-    mut input: PipelineStatePropagator,
-    subrules: &Vec<Box<PipelineFixerSubrule>>,
-) -> Result<Transformed<PipelineStatePropagator>> {
-    for subrule in subrules {
-        if let Some(value) = subrule(input.clone()).transpose()? {
-            input = value;
-        }
-    }
-    let is_unbounded = input
-        .plan
-        .unbounded_output(&input.children_unbounded)
-        // Treat the case where an operator can not run on unbounded data as
-        // if it can and it outputs unbounded data. Do not raise an error yet.
-        // Such operators may be fixed, adjusted or replaced later on during
-        // optimization passes -- sorts may be removed, windows may be adjusted
-        // etc. If this doesn't happen, the final `PipelineChecker` rule will
-        // catch this and raise an error anyway.
-        .unwrap_or(true);
-    input.unbounded = is_unbounded;
-    Ok(Transformed::Yes(input))
-}
-
-#[cfg(test)]
-mod util_tests {
-    use datafusion_expr::Operator;
-    use datafusion_physical_expr::expressions::{BinaryExpr, Column, 
NegativeExpr};
-    use datafusion_physical_expr::intervals::check_support;
-    use datafusion_physical_expr::PhysicalExpr;
-    use std::sync::Arc;
-
-    #[test]
-    fn check_expr_supported() {
-        let supported_expr = Arc::new(BinaryExpr::new(
-            Arc::new(Column::new("a", 0)),
-            Operator::Plus,
-            Arc::new(Column::new("a", 0)),
-        )) as Arc<dyn PhysicalExpr>;
-        assert!(check_support(&supported_expr));
-        let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn 
PhysicalExpr>;
-        assert!(check_support(&supported_expr_2));
-        let unsupported_expr = Arc::new(BinaryExpr::new(
-            Arc::new(Column::new("a", 0)),
-            Operator::Or,
-            Arc::new(Column::new("a", 0)),
-        )) as Arc<dyn PhysicalExpr>;
-        assert!(!check_support(&unsupported_expr));
-        let unsupported_expr_2 = Arc::new(BinaryExpr::new(
-            Arc::new(Column::new("a", 0)),
-            Operator::Or,
-            Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
-        )) as Arc<dyn PhysicalExpr>;
-        assert!(!check_support(&unsupported_expr_2));
-    }
-}
-
-#[cfg(test)]
-mod hash_join_tests {
-    use super::*;
-    use crate::physical_optimizer::join_selection::swap_join_type;
-    use crate::physical_optimizer::test_utils::SourceType;
-    use crate::physical_plan::expressions::Column;
-    use crate::physical_plan::joins::PartitionMode;
-    use crate::physical_plan::projection::ProjectionExec;
-    use crate::test_util::UnboundedExec;
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow::record_batch::RecordBatch;
-    use datafusion_common::utils::DataPtr;
-    use std::sync::Arc;
-
-    struct TestCase {
-        case: String,
-        initial_sources_unbounded: (SourceType, SourceType),
-        initial_join_type: JoinType,
-        initial_mode: PartitionMode,
-        expected_sources_unbounded: (SourceType, SourceType),
-        expected_join_type: JoinType,
-        expected_mode: PartitionMode,
-        expecting_swap: bool,
-    }
-
-    #[tokio::test]
-    async fn test_join_with_swap_full() -> Result<()> {
-        // NOTE: Currently, some initial conditions are not viable after join 
order selection.
-        //       For example, full join always comes in partitioned mode. See 
the warning in
-        //       function "swap". If this changes in the future, we should 
update these tests.
-        let cases = vec![
-            TestCase {
-                case: "Bounded - Unbounded 1".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                initial_join_type: JoinType::Full,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: JoinType::Full,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            },
-            TestCase {
-                case: "Unbounded - Bounded 2".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                initial_join_type: JoinType::Full,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                expected_join_type: JoinType::Full,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            },
-            TestCase {
-                case: "Bounded - Bounded 3".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                initial_join_type: JoinType::Full,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                expected_join_type: JoinType::Full,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            },
-            TestCase {
-                case: "Unbounded - Unbounded 4".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
-                initial_join_type: JoinType::Full,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (
-                    SourceType::Unbounded,
-                    SourceType::Unbounded,
-                ),
-                expected_join_type: JoinType::Full,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            },
-        ];
-        for case in cases.into_iter() {
-            test_join_with_maybe_swap_unbounded_case(case).await?
-        }
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_cases_without_collect_left_check() -> Result<()> {
-        let mut cases = vec![];
-        let join_types = vec![JoinType::LeftSemi, JoinType::Inner];
-        for join_type in join_types {
-            cases.push(TestCase {
-                case: "Unbounded - Bounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: swap_join_type(join_type),
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: true,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Unbounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Unbounded - Unbounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (
-                    SourceType::Unbounded,
-                    SourceType::Unbounded,
-                ),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Bounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Unbounded - Bounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: swap_join_type(join_type),
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: true,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Unbounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Bounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Unbounded - Unbounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (
-                    SourceType::Unbounded,
-                    SourceType::Unbounded,
-                ),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-        }
-
-        for case in cases.into_iter() {
-            test_join_with_maybe_swap_unbounded_case(case).await?
-        }
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_not_support_collect_left() -> Result<()> {
-        let mut cases = vec![];
-        // After [JoinSelection] optimization, these join types cannot run in 
CollectLeft mode except
-        // [JoinType::LeftSemi]
-        let the_ones_not_support_collect_left = vec![JoinType::Left, 
JoinType::LeftAnti];
-        for join_type in the_ones_not_support_collect_left {
-            cases.push(TestCase {
-                case: "Unbounded - Bounded".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: swap_join_type(join_type),
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: true,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Unbounded".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Bounded".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Unbounded - Unbounded".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (
-                    SourceType::Unbounded,
-                    SourceType::Unbounded,
-                ),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-        }
-
-        for case in cases.into_iter() {
-            test_join_with_maybe_swap_unbounded_case(case).await?
-        }
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> {
-        let mut cases = vec![];
-        let the_ones_not_support_collect_left =
-            vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi];
-        for join_type in the_ones_not_support_collect_left {
-            // We expect that (SourceType::Unbounded, SourceType::Bounded) 
will change, regardless of the
-            // statistics.
-            cases.push(TestCase {
-                case: "Unbounded - Bounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: false,
-            });
-            // We expect that (SourceType::Bounded, SourceType::Unbounded) 
will stay same, regardless of the
-            // statistics.
-            cases.push(TestCase {
-                case: "Bounded - Unbounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Unbounded - Unbounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (
-                    SourceType::Unbounded,
-                    SourceType::Unbounded,
-                ),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: false,
-            });
-            //
-            cases.push(TestCase {
-                case: "Bounded - Bounded / CollectLeft".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::CollectLeft,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::CollectLeft,
-                expecting_swap: false,
-            });
-            // If cases are partitioned, only unbounded & bounded check will 
affect the order.
-            cases.push(TestCase {
-                case: "Unbounded - Bounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Unbounded, 
SourceType::Bounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Unbounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Unbounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Bounded - Bounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (SourceType::Bounded, 
SourceType::Bounded),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-            cases.push(TestCase {
-                case: "Unbounded - Unbounded / Partitioned".to_string(),
-                initial_sources_unbounded: (SourceType::Unbounded, 
SourceType::Unbounded),
-                initial_join_type: join_type,
-                initial_mode: PartitionMode::Partitioned,
-                expected_sources_unbounded: (
-                    SourceType::Unbounded,
-                    SourceType::Unbounded,
-                ),
-                expected_join_type: join_type,
-                expected_mode: PartitionMode::Partitioned,
-                expecting_swap: false,
-            });
-        }
-
-        for case in cases.into_iter() {
-            test_join_with_maybe_swap_unbounded_case(case).await?
-        }
-        Ok(())
-    }
-
-    async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> 
Result<()> {
-        let left_unbounded = t.initial_sources_unbounded.0 == 
SourceType::Unbounded;
-        let right_unbounded = t.initial_sources_unbounded.1 == 
SourceType::Unbounded;
-        let left_exec = Arc::new(UnboundedExec::new(
-            (!left_unbounded).then_some(1),
-            RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
-                "a",
-                DataType::Int32,
-                false,
-            )]))),
-            2,
-        )) as Arc<dyn ExecutionPlan>;
-        let right_exec = Arc::new(UnboundedExec::new(
-            (!right_unbounded).then_some(1),
-            RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
-                "b",
-                DataType::Int32,
-                false,
-            )]))),
-            2,
-        )) as Arc<dyn ExecutionPlan>;
-
-        let join = HashJoinExec::try_new(
-            Arc::clone(&left_exec),
-            Arc::clone(&right_exec),
-            vec![(
-                Column::new_with_schema("a", &left_exec.schema())?,
-                Column::new_with_schema("b", &right_exec.schema())?,
-            )],
-            None,
-            &t.initial_join_type,
-            t.initial_mode,
-            false,
-        )?;
-
-        let initial_hash_join_state = PipelineStatePropagator {
-            plan: Arc::new(join),
-            unbounded: false,
-            children_unbounded: vec![left_unbounded, right_unbounded],
-        };
-        let optimized_hash_join =
-            hash_join_swap_subrule(initial_hash_join_state).unwrap()?;
-        let optimized_join_plan = optimized_hash_join.plan;
-
-        // If swap did happen
-        let projection_added = 
optimized_join_plan.as_any().is::<ProjectionExec>();
-        let plan = if projection_added {
-            let proj = optimized_join_plan
-                .as_any()
-                .downcast_ref::<ProjectionExec>()
-                .expect(
-                    "A proj is required to swap columns back to their original 
order",
-                );
-            proj.input().clone()
-        } else {
-            optimized_join_plan
-        };
-
-        if let Some(HashJoinExec {
-            left,
-            right,
-            join_type,
-            mode,
-            ..
-        }) = plan.as_any().downcast_ref::<HashJoinExec>()
-        {
-            let left_changed = Arc::data_ptr_eq(left, &right_exec);
-            let right_changed = Arc::data_ptr_eq(right, &left_exec);
-            // If this is not equal, we have a bigger problem.
-            assert_eq!(left_changed, right_changed);
-            assert_eq!(
-                (
-                    t.case.as_str(),
-                    if left.unbounded_output(&[])? {
-                        SourceType::Unbounded
-                    } else {
-                        SourceType::Bounded
-                    },
-                    if right.unbounded_output(&[])? {
-                        SourceType::Unbounded
-                    } else {
-                        SourceType::Bounded
-                    },
-                    join_type,
-                    mode,
-                    left_changed && right_changed
-                ),
-                (
-                    t.case.as_str(),
-                    t.expected_sources_unbounded.0,
-                    t.expected_sources_unbounded.1,
-                    &t.expected_join_type,
-                    &t.expected_mode,
-                    t.expecting_swap
-                )
-            );
-        };
-        Ok(())
-    }
-}
diff --git a/datafusion/core/tests/memory_limit.rs 
b/datafusion/core/tests/memory_limit.rs
index 36872b7361..5b18d616b3 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -28,7 +28,7 @@ use datafusion::datasource::MemTable;
 use datafusion::execution::context::SessionState;
 use datafusion::execution::disk_manager::DiskManagerConfig;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_optimizer::pipeline_fixer::PipelineFixer;
+use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::SendableRecordBatchStream;
 use datafusion_common::assert_contains;
@@ -304,7 +304,7 @@ async fn run_streaming_test_with_config(
     // Disable all physical optimizer rules except the PipelineFixer rule to 
avoid sorts or
     // repartition, as they also have memory budgets that may be hit first
     let state = SessionState::with_config_rt(config, Arc::new(runtime))
-        .with_physical_optimizer_rules(vec![Arc::new(PipelineFixer::new())]);
+        .with_physical_optimizer_rules(vec![Arc::new(JoinSelection::new())]);
 
     // Create a new session context with the session state
     let ctx = SessionContext::with_state(state);
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt 
b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index 56f2fdf10a..bd3513550a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -240,7 +240,6 @@ logical_plan TableScan: simple_explain_test projection=[a, 
b, c]
 initial_physical_plan CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], has_header=true
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE
 physical_plan after join_selection SAME TEXT AS ABOVE
-physical_plan after PipelineFixer SAME TEXT AS ABOVE
 physical_plan after repartition SAME TEXT AS ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE

Reply via email to