wenlong88 commented on a change in pull request #14606:
URL: https://github.com/apache/flink/pull/14606#discussion_r556465105



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
+import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
 TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.util.Preconditions.checkState
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+/**
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(oproctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table 
join,
+ * the only difference is the validation, we reuse most same logic here.
+ */
+class StreamPhysicalTemporalJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftRel: RelNode,
+    rightRel: RelNode,
+    condition: RexNode,
+    joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, 
joinType)
+  with StreamPhysicalRel {
+
+  override def requireWatermark: Boolean = {
+    TemporalJoinUtil.isRowTimeJoin(joinSpec)
+  }
+
+  override def copy(
+      traitSet: RelTraitSet,
+      conditionExpr: RexNode,
+      left: RelNode,
+      right: RelNode,
+      joinType: JoinRelType,
+      semiJoinDone: Boolean): Join = {
+    new StreamPhysicalTemporalJoin(
+      cluster,
+      traitSet,
+      left,
+      right,
+      conditionExpr,
+      joinType)
+  }
+
+  override def translateToExecNode(): ExecNode[_] = {
+    val textualRepresentation = this.toString
+    val rexBuilder = cluster.getRexBuilder
+    val isTemporalFunctionJoin =
+        TemporalJoinUtil.isTemporalFunctionJoin(rexBuilder, joinInfo)
+
+    val leftFieldCount = getLeft.getRowType.getFieldCount
+    val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+      textualRepresentation,
+      leftFieldCount,
+      joinSpec,
+      cluster.getRexBuilder,
+      isTemporalFunctionJoin)
+    val remainingNonEquiJoinPredicates =
+      temporalJoinConditionExtractor.apply(joinSpec.getNonEquiConditions)
+    val temporalJoinSpec = new JoinSpec(
+      joinSpec.getJoinType,
+      joinSpec.getLeftKeys,
+      joinSpec.getRightKeys,
+      joinSpec.getFilterNulls,
+      remainingNonEquiJoinPredicates)
+
+    val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef: 
Optional[Integer]) =
+      if (TemporalJoinUtil.isRowTimeJoin(joinSpec)) {
+        checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined 
&&
+          temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+          "Missing %s in Event-Time temporal join condition", 
TEMPORAL_JOIN_CONDITION)
+
+        val leftTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, 
textualRepresentation)
+        val rightTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.rightTimeAttribute.get, 
textualRepresentation)
+        val rightInputRef = rightTimeAttributeInputRef - leftFieldCount
+
+        (leftTimeAttributeInputRef, Optional.of(new Integer(rightInputRef)))
+      } else {
+        val leftTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, 
textualRepresentation)
+        // right time attribute defined in temporal join condition iff in 
Event time join
+        (leftTimeAttributeInputRef, 
Optional.empty().asInstanceOf[Optional[Integer]])
+      }
+
+    new StreamExecTemporalJoin(
+      temporalJoinSpec,
+      isTemporalFunctionJoin,
+      leftTimeAttributeInputRef,
+      rightRowTimeAttributeInputRef,
+      ExecEdge.DEFAULT,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription)
+  }
+
+  /**
+   * TemporalJoinConditionExtractor extracts TEMPORAL_JOIN_CONDITION from 
non-equi join conditions.
+   *
+   * <p>TimeAttributes of both sides and primary keys of right side will be 
extracted and
+   * the TEMPORAL_JOIN_CONDITION RexCall will be pruned after extraction. </p>
+   */
+  private class TemporalJoinConditionExtractor(

Review comment:
       the name comes from original code, I think we can treat pk/time 
attribute as part of the temporal join condition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to