ozankabak commented on code in PR #5322:
URL: https://github.com/apache/arrow-datafusion/pull/5322#discussion_r1119605173


##########
datafusion/core/src/physical_optimizer/pipeline_fixer.rs:
##########
@@ -77,6 +87,104 @@ impl PhysicalOptimizerRule for PipelineFixer {
     }
 }
 
+/// Indicates whether interval arithmetic is supported for the given 
expression.
+/// Currently, we do not support all [PhysicalExpr]s for interval calculations.
+/// We do not support every type of [Operator]s either. Over time, this check
+/// will relax as more types of [PhysicalExpr]s and [Operator]s are supported.
+/// Currently, [CastExpr], [BinaryExpr], [Column] and [Literal] is supported.
+fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
+    let expr_any = expr.as_any();
+    let expr_supported = if let Some(binary_expr) = 
expr_any.downcast_ref::<BinaryExpr>()
+    {
+        is_operator_supported(binary_expr.op())
+    } else {
+        expr_any.is::<Column>() || expr_any.is::<Literal>() || 
expr_any.is::<CastExpr>()
+    };
+    expr_supported && expr.children().iter().all(check_support)
+}
+
+/// This function returns whether a given hash join is replaceable by a
+/// symmetric hash join. Basically, the requirement is that involved
+/// [PhysicalExpr]s, [Operator]s and data types need to be supported,
+/// and order information must cover every column in the filter expression.
+fn is_suitable_for_symmetric_hash_join(hash_join: &HashJoinExec) -> 
Result<bool> {
+    if let Some(filter) = hash_join.filter() {
+        let left = hash_join.left();
+        if let Some(left_ordering) = left.output_ordering() {
+            let right = hash_join.right();
+            if let Some(right_ordering) = right.output_ordering() {
+                let expr_supported = check_support(filter.expression());
+                let left_convertible = convert_sort_expr_with_filter_schema(
+                    &JoinSide::Left,
+                    filter,
+                    &left.schema(),
+                    &left_ordering[0],
+                )?
+                .is_some();
+                let right_convertible = convert_sort_expr_with_filter_schema(
+                    &JoinSide::Right,
+                    filter,
+                    &right.schema(),
+                    &right_ordering[0],
+                )?
+                .is_some();
+                let fields_supported = filter
+                    .schema()
+                    .fields()
+                    .iter()
+                    .all(|f| is_datatype_supported(f.data_type()));
+                return Ok(expr_supported
+                    && fields_supported
+                    && left_convertible
+                    && right_convertible);
+            }
+        }
+    }
+    Ok(false)
+}
+
+/// This subrule checks if one can replace a hash join with a symmetric hash
+/// join so that the pipeline does not break due to the join operation in
+/// question. If possible, it makes this replacement; otherwise, it has no
+/// effect.
+fn hash_join_convert_symmetric_subrule(
+    input: PipelineStatePropagator,
+) -> Option<Result<PipelineStatePropagator>> {
+    let plan = input.plan;
+    if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+        let ub_flags = input.children_unbounded;
+        let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);

Review Comment:
   We will think about this. I don't see a straightforward way right away but I 
will talk to @metesynnada about it in detail. If this turns out to be possible, 
we can make a follow-on PR about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to