godfreyhe commented on a change in pull request #14606:
URL: https://github.com/apache/flink/pull/14606#discussion_r557118638
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashJoin.scala
##########
@@ -162,38 +155,40 @@ class BatchPhysicalHashJoin(
}
override def translateToExecNode(): ExecNode[_] = {
- val nonEquiPredicates = if (!joinInfo.isEqui) {
- joinInfo.getRemaining(getCluster.getRexBuilder)
- } else {
- null
- }
+ JoinUtil.validateJoinSpec(
+ joinSpec,
+ FlinkTypeFactory.toLogicalRowType(left.getRowType),
+ FlinkTypeFactory.toLogicalRowType(right.getRowType))
Review comment:
`BatchExecHashJoin` operator have validated when translating to plan, is
this validation needed ? if it needed, I would like to do the validation when
constructing this node
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashJoin.scala
##########
@@ -162,38 +155,40 @@ class BatchPhysicalHashJoin(
}
override def translateToExecNode(): ExecNode[_] = {
- val nonEquiPredicates = if (!joinInfo.isEqui) {
- joinInfo.getRemaining(getCluster.getRexBuilder)
- } else {
- null
- }
+ JoinUtil.validateJoinSpec(
+ joinSpec,
+ FlinkTypeFactory.toLogicalRowType(left.getRowType),
+ FlinkTypeFactory.toLogicalRowType(right.getRowType))
+
val mq = getCluster.getMetadataQuery
val leftRowSize = Util.first(mq.getAverageRowSize(left), 24).toInt
val leftRowCount = Util.first(mq.getRowCount(left), 200000).toLong
val rightRowSize = Util.first(mq.getAverageRowSize(right), 24).toInt
val rightRowCount = Util.first(mq.getRowCount(right), 200000).toLong
+ val (leftEdge, rightEdge) = getInputEdges
new BatchExecHashJoin(
- JoinTypeUtil.getFlinkJoinType(joinType),
- leftKeys,
- rightKeys,
- filterNulls,
- nonEquiPredicates,
+ joinSpec,
leftRowSize,
rightRowSize,
leftRowCount,
rightRowCount,
leftIsBuild,
tryDistinctBuildRow,
- getInputEdges,
+ leftEdge,
Review comment:
`BatchExecSortMergeJoin` and `BatchExecNestedLoopJoin` are missed.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
##########
@@ -20,19 +20,20 @@ package
org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.nodes.logical._
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.util.Preconditions.checkState
+
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.JoinRelType
-import org.apache.flink.util.Preconditions.checkState
/**
- * Rule that matches a temporal join node and converts it to
[[StreamExecTemporalJoin]],
- * the temporal join node is a [[FlinkLogicalJoin]] which contains
[[TemporalJoinCondition]].
+ * Rule that matches a temporal join node and converts it to
[[StreamPhysicalTemporalJoin]],
+ * the temporal join node is a [[FlinkLogicalJoin]] which contains
[[TEMPORAL_JOIN_CONDITION]].
*/
-class StreamExecTemporalJoinRule
- extends StreamExecJoinRuleBase("StreamExecJoinRuleBase") {
+class StreamPhysicalTemporalJoinRule
+ extends StreamPhysicalJoinRuleBase("StreamPhysicalJoinRuleBase") {
Review comment:
The description should be `StreamPhysicalTemporalJoinRule`
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/JoinSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.utils;
+
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** JoinSpec describes how two tables will be joined. */
+public class JoinSpec {
+ /** {@link FlinkJoinType} of the join. */
+ private final FlinkJoinType joinType;
+ /** 0-based index of join keys in left side. */
+ private final int[] leftKeys;
+ /** 0-based index of join keys in right side. */
+ private final int[] rightKeys;
+ /** whether to filter null values or not. */
+ private final boolean[] filterNulls;
+ /** Non Equi join conditions. */
+ private final @Nullable RexNode nonEquiCondition;
+
+ public JoinSpec(
+ FlinkJoinType joinType,
+ int[] leftKeys,
+ int[] rightKeys,
+ boolean[] filterNulls,
+ @Nullable RexNode nonEquiCondition) {
+ this.joinType = Preconditions.checkNotNull(joinType);
+ this.leftKeys = Preconditions.checkNotNull(leftKeys);
+ this.rightKeys = Preconditions.checkNotNull(rightKeys);
+ this.filterNulls = Preconditions.checkNotNull(filterNulls);
+ Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+ Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+
+ this.nonEquiCondition = nonEquiCondition;
+ }
+
+ public FlinkJoinType getJoinType() {
+ return joinType;
+ }
+
+ public int[] getLeftKeys() {
+ return leftKeys;
+ }
+
+ public int[] getRightKeys() {
+ return rightKeys;
+ }
+
+ public boolean[] getFilterNulls() {
+ return filterNulls;
+ }
+
+ @Nullable
+ public RexNode getNonEquiCondition() {
Review comment:
How about return `Optional<RexNode>` ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/JoinSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.utils;
+
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** JoinSpec describes how two tables will be joined. */
+public class JoinSpec {
+ /** {@link FlinkJoinType} of the join. */
+ private final FlinkJoinType joinType;
+ /** 0-based index of join keys in left side. */
+ private final int[] leftKeys;
+ /** 0-based index of join keys in right side. */
+ private final int[] rightKeys;
+ /** whether to filter null values or not. */
+ private final boolean[] filterNulls;
+ /** Non Equi join conditions. */
+ private final @Nullable RexNode nonEquiCondition;
+
+ public JoinSpec(
+ FlinkJoinType joinType,
+ int[] leftKeys,
+ int[] rightKeys,
+ boolean[] filterNulls,
+ @Nullable RexNode nonEquiCondition) {
+ this.joinType = Preconditions.checkNotNull(joinType);
+ this.leftKeys = Preconditions.checkNotNull(leftKeys);
+ this.rightKeys = Preconditions.checkNotNull(rightKeys);
+ this.filterNulls = Preconditions.checkNotNull(filterNulls);
+ Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+ Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+
+ this.nonEquiCondition = nonEquiCondition;
+ }
+
+ public FlinkJoinType getJoinType() {
+ return joinType;
+ }
+
+ public int[] getLeftKeys() {
+ return leftKeys;
+ }
+
+ public int[] getRightKeys() {
+ return rightKeys;
+ }
+
+ public boolean[] getFilterNulls() {
+ return filterNulls;
+ }
+
+ @Nullable
+ public RexNode getNonEquiCondition() {
+ return nonEquiCondition;
+ }
+
+ /** Gets number of keys in join key. */
+ public int getJoinKeySize() {
+ return leftKeys.length;
+ }
+
+ /** Creates a JoinSpecBuilder. */
+ public static JoinSpecBuilder builder() {
+ return new JoinSpecBuilder();
+ }
+
+ /** Utils class to build a {@link JoinSpec}. */
+ public static class JoinSpecBuilder {
+ private FlinkJoinType joinType;
+ private final List<Integer> leftKeys = new ArrayList<>();
+ private final List<Integer> rightKeys = new ArrayList<>();
+ private final List<Boolean> filterNulls = new ArrayList<>();
+ private @Nullable RexNode nonEquiConditions;
+
+ public JoinSpecBuilder joinType(FlinkJoinType joinType) {
+ this.joinType = joinType;
+ return this;
+ }
+
+ /** Parse join info from given rel nodes and condition. */
+ public JoinSpecBuilder parseCondition(RelNode left, RelNode right,
RexNode condition) {
+ JoinInfo joinInfo = JoinUtil.createJoinInfo(left, right,
condition, filterNulls);
+ leftKeys.addAll(joinInfo.leftKeys);
+ rightKeys.addAll(joinInfo.rightKeys);
+ nonEquiConditions =
+ RexUtil.composeConjunction(
+ left.getCluster().getRexBuilder(),
joinInfo.nonEquiConditions);
+ return this;
+ }
+
+ public JoinSpec build() {
+ boolean[] filterNullsArray = new boolean[filterNulls.size()];
+ IntStream.range(0, filterNulls.size())
+ .forEach(i -> filterNullsArray[i] = filterNulls.get(i));
+ return new JoinSpec(
+ joinType,
+ leftKeys.stream().mapToInt(Integer::intValue).toArray(),
+ rightKeys.stream().mapToInt(Integer::intValue).toArray(),
+ filterNullsArray,
+ nonEquiConditions);
+ }
+ }
Review comment:
This part is more like a utility method for creating a JoinSpec from
RelNode, so I think we can move it to JoinUtil, and make JoinSpec more clean
and do not depend on calcite rel.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]