This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd45e052258 [FLINK-34555][table] Migrate JoinConditionTypeCoerceRule 
to java
dd45e052258 is described below

commit dd45e0522588ea594e4a92fd98d8115363a5700a
Author: Jacky Lau <[email protected]>
AuthorDate: Fri Oct 4 05:23:09 2024 +0800

    [FLINK-34555][table] Migrate JoinConditionTypeCoerceRule to java
---
 .../rules/logical/JoinConditionTypeCoerceRule.java | 178 +++++++++++++++++++++
 .../logical/JoinConditionTypeCoerceRule.scala      | 123 --------------
 2 files changed, 178 insertions(+), 123 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.java
new file mode 100644
index 00000000000..8558256c699
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.immutables.value.Value;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Planner rule that coerces the both sides of EQUALS(`=`) operator in Join 
condition to the same
+ * type while sans nullability.
+ *
+ * <p>For most cases, we already did the type coercion during type validation 
by implicit type
+ * coercion or during sqlNode to relNode conversion, this rule just does a 
rechecking to ensure a
+ * strongly uniform equals type, so that during a HashJoin shuffle we can have 
the same hashcode of
+ * the same value.
+ */
[email protected]
+public class JoinConditionTypeCoerceRule
+        extends 
RelRule<JoinConditionTypeCoerceRule.JoinConditionTypeCoerceRuleConfig> {
+
+    public static final JoinConditionTypeCoerceRule INSTANCE =
+            
JoinConditionTypeCoerceRule.JoinConditionTypeCoerceRuleConfig.DEFAULT.toRule();
+
+    private JoinConditionTypeCoerceRule(JoinConditionTypeCoerceRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        Join join = call.rel(0);
+        if (join.getCondition().isAlwaysTrue()) {
+            return false;
+        }
+        RelDataTypeFactory typeFactory = call.builder().getTypeFactory();
+        return hasEqualsRefsOfDifferentTypes(typeFactory, join.getCondition());
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        Join join = call.rel(0);
+        RelBuilder builder = call.builder();
+        RexBuilder rexBuilder = builder.getRexBuilder();
+        RelDataTypeFactory typeFactory = builder.getTypeFactory();
+
+        List<RexNode> joinFilters = 
RelOptUtil.conjunctions(join.getCondition());
+        List<RexNode> newJoinFilters =
+                joinFilters.stream()
+                        .map(
+                                filter -> {
+                                    if (filter instanceof RexCall) {
+                                        RexCall c = (RexCall) filter;
+                                        if (c.getKind() == SqlKind.EQUALS) {
+                                            RexNode leftOp = 
c.getOperands().get(0);
+                                            RexNode rightOp = 
c.getOperands().get(1);
+                                            if (leftOp instanceof RexInputRef
+                                                    && rightOp instanceof 
RexInputRef) {
+                                                RexInputRef ref1 = 
(RexInputRef) leftOp;
+                                                RexInputRef ref2 = 
(RexInputRef) rightOp;
+                                                if 
(!SqlTypeUtil.equalSansNullability(
+                                                        typeFactory,
+                                                        ref1.getType(),
+                                                        ref2.getType())) {
+                                                    List<RelDataType> refTypes 
=
+                                                            Arrays.asList(
+                                                                    
ref1.getType(), ref2.getType());
+                                                    RelDataType targetType =
+                                                            
typeFactory.leastRestrictive(refTypes);
+                                                    if (targetType == null) {
+                                                        throw new 
TableException(
+                                                                "implicit type 
conversion between "
+                                                                        + 
ref1.getType()
+                                                                        + " 
and "
+                                                                        + 
ref2.getType()
+                                                                        + " is 
not supported on join's condition now");
+                                                    }
+                                                    return builder.equals(
+                                                            
rexBuilder.ensureType(
+                                                                    
targetType, ref1, true),
+                                                            
rexBuilder.ensureType(
+                                                                    
targetType, ref2, true));
+                                                }
+                                            }
+                                        }
+                                    }
+                                    return filter;
+                                })
+                        .collect(Collectors.toList());
+
+        RexNode newCondExp =
+                builder.and(
+                        FlinkRexUtil.simplify(
+                                rexBuilder,
+                                builder.and(newJoinFilters),
+                                join.getCluster().getPlanner().getExecutor()));
+
+        Join newJoin =
+                join.copy(
+                        join.getTraitSet(),
+                        newCondExp,
+                        join.getLeft(),
+                        join.getRight(),
+                        join.getJoinType(),
+                        join.isSemiJoinDone());
+
+        call.transformTo(newJoin);
+    }
+
+    /**
+     * Returns true if two input refs of an equal call have different types in 
join condition, else
+     * false.
+     */
+    private boolean hasEqualsRefsOfDifferentTypes(
+            RelDataTypeFactory typeFactory, RexNode predicate) {
+        List<RexNode> conjunctions = RelOptUtil.conjunctions(predicate);
+        return conjunctions.stream()
+                .filter(node -> node instanceof RexCall && node.getKind() == 
SqlKind.EQUALS)
+                .anyMatch(
+                        c -> {
+                            RexCall call = (RexCall) c;
+                            RexNode ref1 = call.getOperands().get(0);
+                            RexNode ref2 = call.getOperands().get(1);
+                            return ref1 instanceof RexInputRef
+                                    && ref2 instanceof RexInputRef
+                                    && !SqlTypeUtil.equalSansNullability(
+                                            typeFactory, ref1.getType(), 
ref2.getType());
+                        });
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface JoinConditionTypeCoerceRuleConfig extends RelRule.Config {
+        JoinConditionTypeCoerceRule.JoinConditionTypeCoerceRuleConfig DEFAULT =
+                
ImmutableJoinConditionTypeCoerceRule.JoinConditionTypeCoerceRuleConfig.builder()
+                        .build()
+                        .withOperandSupplier(b0 -> 
b0.operand(Join.class).anyInputs())
+                        .withDescription("JoinConditionTypeCoerceRule");
+
+        @Override
+        default JoinConditionTypeCoerceRule toRule() {
+            return new JoinConditionTypeCoerceRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
deleted file mode 100644
index 41bff375301..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.`type`.RelDataTypeFactory
-import org.apache.calcite.rel.core.Join
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.`type`.SqlTypeUtil
-import org.apache.calcite.sql.SqlKind
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-
-/**
- * Planner rule that coerces the both sides of EQUALS(`=`) operator in Join 
condition to the same
- * type while sans nullability.
- *
- * <p>For most cases, we already did the type coercion during type validation 
by implicit type
- * coercion or during sqlNode to relNode conversion, this rule just does a 
rechecking to ensure a
- * strongly uniform equals type, so that during a HashJoin shuffle we can have 
the same hashcode of
- * the same value.
- */
-class JoinConditionTypeCoerceRule
-  extends RelOptRule(operand(classOf[Join], any), 
"JoinConditionTypeCoerceRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join: Join = call.rel(0)
-    if (join.getCondition.isAlwaysTrue) {
-      return false
-    }
-    val typeFactory = call.builder().getTypeFactory
-    hasEqualsRefsOfDifferentTypes(typeFactory, join.getCondition)
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val join: Join = call.rel(0)
-    val builder = call.builder()
-    val rexBuilder = builder.getRexBuilder
-    val typeFactory = builder.getTypeFactory
-
-    val newJoinFilters = mutable.ArrayBuffer[RexNode]()
-    val joinFilters = RelOptUtil.conjunctions(join.getCondition)
-    joinFilters.foreach {
-      case c: RexCall if c.isA(SqlKind.EQUALS) =>
-        (c.operands.head, c.operands.last) match {
-          case (ref1: RexInputRef, ref2: RexInputRef)
-              if !SqlTypeUtil.equalSansNullability(typeFactory, ref1.getType, 
ref2.getType) =>
-            val refList = ref1 :: ref2 :: Nil
-            val targetType = typeFactory.leastRestrictive(refList.map(ref => 
ref.getType))
-            if (targetType == null) {
-              throw new TableException(
-                s"implicit type conversion between" +
-                  s" ${ref1.getType} and ${ref2.getType} " +
-                  s"is not supported on join's condition now")
-            }
-            newJoinFilters += builder.equals(
-              rexBuilder.ensureType(targetType, ref1, true),
-              rexBuilder.ensureType(targetType, ref2, true))
-          case _ =>
-            newJoinFilters += c
-        }
-      case r: RexNode =>
-        newJoinFilters += r
-    }
-
-    val newCondExp = builder.and(
-      FlinkRexUtil
-        .simplify(rexBuilder, builder.and(newJoinFilters), 
join.getCluster.getPlanner.getExecutor))
-
-    val newJoin = join.copy(
-      join.getTraitSet,
-      newCondExp,
-      join.getLeft,
-      join.getRight,
-      join.getJoinType,
-      join.isSemiJoinDone)
-
-    call.transformTo(newJoin)
-  }
-
-  /**
-   * Returns true if two input refs of an equal call have different types in 
join condition, else
-   * false.
-   */
-  private def hasEqualsRefsOfDifferentTypes(
-      typeFactory: RelDataTypeFactory,
-      predicate: RexNode): Boolean = {
-    val conjunctions = RelOptUtil.conjunctions(predicate)
-    conjunctions.exists {
-      case c: RexCall if c.isA(SqlKind.EQUALS) =>
-        (c.operands.head, c.operands.last) match {
-          case (ref1: RexInputRef, ref2: RexInputRef) =>
-            !SqlTypeUtil.equalSansNullability(typeFactory, ref1.getType, 
ref2.getType)
-          case _ => false
-        }
-      case _ => false
-    }
-  }
-}
-
-object JoinConditionTypeCoerceRule {
-  val INSTANCE = new JoinConditionTypeCoerceRule
-}

Reply via email to