mingmwang commented on code in PR #5322:
URL: https://github.com/apache/arrow-datafusion/pull/5322#discussion_r1119528713
##########
datafusion/core/src/physical_optimizer/pipeline_fixer.rs:
##########
@@ -77,6 +87,104 @@ impl PhysicalOptimizerRule for PipelineFixer {
}
}
+/// Indicates whether interval arithmetic is supported for the given
expression.
+/// Currently, we do not support all [PhysicalExpr]s for interval calculations.
+/// We do not support every type of [Operator]s either. Over time, this check
+/// will relax as more types of [PhysicalExpr]s and [Operator]s are supported.
+/// Currently, [CastExpr], [BinaryExpr], [Column] and [Literal] is supported.
+fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
+ let expr_any = expr.as_any();
+ let expr_supported = if let Some(binary_expr) =
expr_any.downcast_ref::<BinaryExpr>()
+ {
+ is_operator_supported(binary_expr.op())
+ } else {
+ expr_any.is::<Column>() || expr_any.is::<Literal>() ||
expr_any.is::<CastExpr>()
+ };
+ expr_supported && expr.children().iter().all(check_support)
+}
+
+/// This function returns whether a given hash join is replaceable by a
+/// symmetric hash join. Basically, the requirement is that involved
+/// [PhysicalExpr]s, [Operator]s and data types need to be supported,
+/// and order information must cover every column in the filter expression.
+fn is_suitable_for_symmetric_hash_join(hash_join: &HashJoinExec) ->
Result<bool> {
+ if let Some(filter) = hash_join.filter() {
+ let left = hash_join.left();
+ if let Some(left_ordering) = left.output_ordering() {
+ let right = hash_join.right();
+ if let Some(right_ordering) = right.output_ordering() {
+ let expr_supported = check_support(filter.expression());
+ let left_convertible = convert_sort_expr_with_filter_schema(
+ &JoinSide::Left,
+ filter,
+ &left.schema(),
+ &left_ordering[0],
+ )?
+ .is_some();
+ let right_convertible = convert_sort_expr_with_filter_schema(
+ &JoinSide::Right,
+ filter,
+ &right.schema(),
+ &right_ordering[0],
+ )?
+ .is_some();
+ let fields_supported = filter
+ .schema()
+ .fields()
+ .iter()
+ .all(|f| is_datatype_supported(f.data_type()));
+ return Ok(expr_supported
+ && fields_supported
+ && left_convertible
+ && right_convertible);
+ }
+ }
+ }
+ Ok(false)
+}
+
+/// This subrule checks if one can replace a hash join with a symmetric hash
+/// join so that the pipeline does not break due to the join operation in
+/// question. If possible, it makes this replacement; otherwise, it has no
+/// effect.
+fn hash_join_convert_symmetric_subrule(
+ input: PipelineStatePropagator,
+) -> Option<Result<PipelineStatePropagator>> {
+ let plan = input.plan;
+ if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+ let ub_flags = input.children_unbounded;
+ let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
Review Comment:
Seems currently we can not move the `hash_join_convert_symmetric_subrule`
logic to the `JoinSelection` rule. But I think the `hash_join_swap_subrule`
logic can be moved to
the `JoinSelection` rule. Since the original `JoinSelection` already has
logic to swap join sides based on estimated stats/size, it does not take the
bounded/unbounded inputs into the consideration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]