wuchong commented on a change in pull request #13300:
URL: https://github.com/apache/flink/pull/13300#discussion_r512554312



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -116,8 +128,8 @@ class TemporalJoinRewriteWithUniqueKeyRule extends 
RelOptRule(
 
     if (!primaryKeyContainedInJoinKey) {
       throw new ValidationException(
-        s"Join key must be the same as temporal table's primary key " +
-          s"in Event-time temporal table join.")
+        s"Join key must contains temporal table's primary key " +
+          s"in temporal table join.")

Review comment:
       Would be better to print what't the join key and primary key of temporal 
table. This would be useful for debugging. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -69,19 +68,33 @@ class TemporalJoinRewriteWithUniqueKeyRule extends 
RelOptRule(
 
     val newJoinCondition = joinCondition.accept(new RexShuttle {
       override def visitCall(call: RexCall): RexNode = {
-        if (call.getOperator == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION &&
-        isRowTimeTemporalTableJoin(snapshot)) {
-          val snapshotTimeInputRef = call.operands(0)
-          val rightTimeInputRef = call.operands(1)
-          val leftJoinKey = call.operands(2).asInstanceOf[RexCall].operands
-          val rightJoinKey = call.operands(3).asInstanceOf[RexCall].operands
+        if (call.getOperator == 
TemporalJoinUtil.INITIAL_TEMPORAL_JOIN_CONDITION) {
+          val (snapshotTimeInputRef, leftJoinKey, rightJoinKey) =
+            TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call) match {
+              case true =>

Review comment:
       I think it would be better to use `if else` branch for conditions 
instead of match pattern. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
   override protected def translateToPlanInternal(
     planner: StreamPlanner): Transformation[RowData] = {
 
-    throw new ValidationException("Physical node of temporal join does not 
supported yet.")
+    validateKeyTypes()
+
+    val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      planner.getTableConfig,
+      returnType,
+      leftRel,
+      rightRel,
+      getJoinInfo,
+      cluster.getRexBuilder)
+
+    val joinOperator = joinTranslator.getJoinOperator(joinType, 
returnType.getFieldNames)
+    val leftKeySelector = joinTranslator.getLeftKeySelector
+    val rightKeySelector = joinTranslator.getRightKeySelector
+
+    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    val ret = new TwoInputTransformation[RowData, RowData, RowData](
+      leftTransform,
+      rightTransform,
+      getRelDetailedDescription,
+      joinOperator,
+      InternalTypeInfo.of(returnType),
+      leftTransform.getParallelism)
+
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+    
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+    ret
+  }
+
+  private def validateKeyTypes(): Unit = {
+    // at least one equality expression
+    val leftFields = left.getRowType.getFieldList
+    val rightFields = right.getRowType.getFieldList
+
+    getJoinInfo.pairs().toList.foreach(pair => {
+      val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+      val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+      // check if keys are compatible
+      if (leftKeyType != rightKeyType) {
+        throw new TableException(
+          "Equality join predicate on incompatible types.\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
+            s"\tCondition: (${RelExplainUtil.expressionToString(
+              getCondition, inputRowType, getExpressionString)})"
+        )
+      }
+    })
+  }
+}
+
+/**
+  * @param rightRowTimeAttributeInputReference is defined only for event time 
joins.
+  */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+  textualRepresentation: String,
+  config: TableConfig,
+  returnType: RowType,
+  leftInputType: RowType,
+  rightInputType: RowType,
+  joinInfo: JoinInfo,
+  rexBuilder: RexBuilder,
+  leftTimeAttributeInputReference: Int,
+  rightRowTimeAttributeInputReference: Option[Int],
+  remainingNonEquiJoinPredicates: RexNode,
+  isTemporalFunctionJoin: Boolean) {
+
+  val nonEquiJoinPredicates: Option[RexNode] = 
Some(remainingNonEquiJoinPredicates)
+
+  def getLeftKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.leftKeys.toIntArray,
+      InternalTypeInfo.of(leftInputType)
+    )
+  }
+
+  def getRightKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.rightKeys.toIntArray,
+      InternalTypeInfo.of(rightInputType)
+    )
+  }
+
+  def getJoinOperator(
+    joinType: JoinRelType,
+    returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData, 
RowData] = {
+
+    // input must not be nullable, because the runtime join function will make 
sure
+    // the code-generated function won't process null inputs
+    val ctx = CodeGeneratorContext(config)
+    val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+      .bindInput(leftInputType)
+      .bindSecondInput(rightInputType)
+
+    val body = if (nonEquiJoinPredicates.isEmpty) {
+      // only equality condition
+      "return true;"
+    } else {
+      val condition = 
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+      s"""
+         |${condition.code}
+         |return ${condition.resultTerm};
+         |""".stripMargin
+    }
+
+    val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+      ctx,
+      "ConditionFunction",
+      body)
+
+    createJoinOperator(config, joinType, generatedJoinCondition)
+  }
+
+  protected def createJoinOperator(
+    tableConfig: TableConfig,
+    joinType: JoinRelType,
+    generatedJoinCondition: GeneratedJoinCondition)
+  : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+    if (isTemporalFunctionJoin) {
+      if (joinType != JoinRelType.INNER) {
+        throw new ValidationException(
+          "Temporal table function join currently only support INNER JOIN and 
LEFT JOIN, " +
+            "but was " + joinType.toString + " JOIN")

Review comment:
       ```suggestion
             "Temporal table function join currently only support INNER JOIN, " 
+
               "but was " + joinType.toString + " JOIN.")
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -186,7 +280,8 @@ object TemporalJoinUtil {
     var rowtimeJoin: Boolean = false
     val visitor = new RexVisitorImpl[Unit](true) {
       override def visitCall(call: RexCall): Unit = {
-        if (isRowTimeTemporalJoinConditionCall(call)) {
+        if (isRowTimeTemporalTableJoinCon(call) ||
+        isRowTimeTemporalFunctionJoinCon(call)) {

Review comment:
       Indent.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
   override protected def translateToPlanInternal(
     planner: StreamPlanner): Transformation[RowData] = {
 
-    throw new ValidationException("Physical node of temporal join does not 
supported yet.")
+    validateKeyTypes()
+
+    val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      planner.getTableConfig,
+      returnType,
+      leftRel,
+      rightRel,
+      getJoinInfo,
+      cluster.getRexBuilder)
+
+    val joinOperator = joinTranslator.getJoinOperator(joinType, 
returnType.getFieldNames)
+    val leftKeySelector = joinTranslator.getLeftKeySelector
+    val rightKeySelector = joinTranslator.getRightKeySelector
+
+    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    val ret = new TwoInputTransformation[RowData, RowData, RowData](
+      leftTransform,
+      rightTransform,
+      getRelDetailedDescription,
+      joinOperator,
+      InternalTypeInfo.of(returnType),
+      leftTransform.getParallelism)
+
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+    
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+    ret
+  }
+
+  private def validateKeyTypes(): Unit = {
+    // at least one equality expression
+    val leftFields = left.getRowType.getFieldList
+    val rightFields = right.getRowType.getFieldList
+
+    getJoinInfo.pairs().toList.foreach(pair => {
+      val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+      val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+      // check if keys are compatible
+      if (leftKeyType != rightKeyType) {
+        throw new TableException(
+          "Equality join predicate on incompatible types.\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
+            s"\tCondition: (${RelExplainUtil.expressionToString(
+              getCondition, inputRowType, getExpressionString)})"
+        )
+      }
+    })
+  }
+}
+
+/**
+  * @param rightRowTimeAttributeInputReference is defined only for event time 
joins.
+  */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+  textualRepresentation: String,
+  config: TableConfig,
+  returnType: RowType,
+  leftInputType: RowType,
+  rightInputType: RowType,
+  joinInfo: JoinInfo,
+  rexBuilder: RexBuilder,
+  leftTimeAttributeInputReference: Int,
+  rightRowTimeAttributeInputReference: Option[Int],
+  remainingNonEquiJoinPredicates: RexNode,
+  isTemporalFunctionJoin: Boolean) {

Review comment:
       Add one more indent for constructor parameters.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -287,10 +287,10 @@ abstract class 
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
             s" but no row time attribute can be found.")
       }
       // Deal primary key in TemporalJoinRewriteUniqueKeyRule
-      TemporalJoinUtil.makeRowTimeTemporalJoinConditionCall(rexBuilder, 
snapshotTimeInputRef,
+      TemporalJoinUtil.makeInitialRowTimeTemporalTableJoinCondCall(rexBuilder, 
snapshotTimeInputRef,

Review comment:
       Split parameters into separate lines. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
   override protected def translateToPlanInternal(
     planner: StreamPlanner): Transformation[RowData] = {
 
-    throw new ValidationException("Physical node of temporal join does not 
supported yet.")
+    validateKeyTypes()
+
+    val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      planner.getTableConfig,
+      returnType,
+      leftRel,
+      rightRel,
+      getJoinInfo,
+      cluster.getRexBuilder)
+
+    val joinOperator = joinTranslator.getJoinOperator(joinType, 
returnType.getFieldNames)
+    val leftKeySelector = joinTranslator.getLeftKeySelector
+    val rightKeySelector = joinTranslator.getRightKeySelector
+
+    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    val ret = new TwoInputTransformation[RowData, RowData, RowData](
+      leftTransform,
+      rightTransform,
+      getRelDetailedDescription,
+      joinOperator,
+      InternalTypeInfo.of(returnType),
+      leftTransform.getParallelism)
+
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+    
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+    ret
+  }
+
+  private def validateKeyTypes(): Unit = {
+    // at least one equality expression
+    val leftFields = left.getRowType.getFieldList
+    val rightFields = right.getRowType.getFieldList
+
+    getJoinInfo.pairs().toList.foreach(pair => {
+      val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+      val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+      // check if keys are compatible
+      if (leftKeyType != rightKeyType) {
+        throw new TableException(
+          "Equality join predicate on incompatible types.\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
+            s"\tCondition: (${RelExplainUtil.expressionToString(
+              getCondition, inputRowType, getExpressionString)})"
+        )
+      }
+    })
+  }
+}
+
+/**
+  * @param rightRowTimeAttributeInputReference is defined only for event time 
joins.
+  */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+  textualRepresentation: String,
+  config: TableConfig,
+  returnType: RowType,
+  leftInputType: RowType,
+  rightInputType: RowType,
+  joinInfo: JoinInfo,
+  rexBuilder: RexBuilder,
+  leftTimeAttributeInputReference: Int,
+  rightRowTimeAttributeInputReference: Option[Int],
+  remainingNonEquiJoinPredicates: RexNode,
+  isTemporalFunctionJoin: Boolean) {
+
+  val nonEquiJoinPredicates: Option[RexNode] = 
Some(remainingNonEquiJoinPredicates)
+
+  def getLeftKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.leftKeys.toIntArray,
+      InternalTypeInfo.of(leftInputType)
+    )
+  }
+
+  def getRightKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.rightKeys.toIntArray,
+      InternalTypeInfo.of(rightInputType)
+    )
+  }
+
+  def getJoinOperator(
+    joinType: JoinRelType,
+    returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData, 
RowData] = {
+
+    // input must not be nullable, because the runtime join function will make 
sure
+    // the code-generated function won't process null inputs
+    val ctx = CodeGeneratorContext(config)
+    val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+      .bindInput(leftInputType)
+      .bindSecondInput(rightInputType)
+
+    val body = if (nonEquiJoinPredicates.isEmpty) {
+      // only equality condition
+      "return true;"
+    } else {
+      val condition = 
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+      s"""
+         |${condition.code}
+         |return ${condition.resultTerm};
+         |""".stripMargin
+    }
+
+    val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+      ctx,
+      "ConditionFunction",
+      body)
+
+    createJoinOperator(config, joinType, generatedJoinCondition)
+  }
+
+  protected def createJoinOperator(
+    tableConfig: TableConfig,
+    joinType: JoinRelType,
+    generatedJoinCondition: GeneratedJoinCondition)
+  : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+    if (isTemporalFunctionJoin) {
+      if (joinType != JoinRelType.INNER) {
+        throw new ValidationException(
+          "Temporal table function join currently only support INNER JOIN and 
LEFT JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    } else {
+      if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+        throw new TableException(
+          "Temporal table join currently only support INNER JOIN and LEFT 
JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    }
+
+    val isLeftOuterJoin = joinType == JoinRelType.LEFT
+    val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+    val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+    if (rightRowTimeAttributeInputReference.isDefined) {
+      if (isTemporalFunctionJoin) {
+        new LegacyTemporalRowTimeJoinOperator(
+          InternalTypeInfo.of(leftInputType),
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          leftTimeAttributeInputReference,
+          rightRowTimeAttributeInputReference.get,
+          minRetentionTime,
+          maxRetentionTime)
+      } else {
+        throw new TableException("Event-time temporal join operator is not 
implemented yet.")
+      }
+    } else {
+      if (isTemporalFunctionJoin) {
+        new TemporalProcessTimeJoinOperator(
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          minRetentionTime,
+          maxRetentionTime,
+          isLeftOuterJoin)
+      } else {
+        // current Processing-time temporal join implements has been finished, 
but the version from

Review comment:
       > The exsiting TemporalProcessTimeJoinOperator has already supported 
temporal table join.
   > However, the semantic of this implementation is problematic, because the 
join processing for left stream doesn't wait for the complete snapshot of 
temporal table, this may mislead users in production environment.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,25 +18,42 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import java.util
+
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.{TableConfig, TableException, 
ValidationException}
 import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, 
StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
-
+import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
 TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor, 
KeySelectorUtil, RelExplainUtil, TemporalJoinUtil}
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector
+import 
org.apache.flink.table.runtime.operators.join.temporal.{LegacyTemporalRowTimeJoinOperator,
 TemporalProcessTimeJoinOperator}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions.checkState
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
 import org.apache.calcite.rex._
-
-import java.util
+import 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.{isProctimeIndicatorType,
 isRowtimeIndicatorType}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
- * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF).
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table 
join,
+ * the only difference is the validation, We reuse most same logic here;

Review comment:
       ```suggestion
    * the only difference is the validation, we reuse most same logic here.
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
   override protected def translateToPlanInternal(
     planner: StreamPlanner): Transformation[RowData] = {
 
-    throw new ValidationException("Physical node of temporal join does not 
supported yet.")
+    validateKeyTypes()
+
+    val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      planner.getTableConfig,
+      returnType,
+      leftRel,
+      rightRel,
+      getJoinInfo,
+      cluster.getRexBuilder)
+
+    val joinOperator = joinTranslator.getJoinOperator(joinType, 
returnType.getFieldNames)
+    val leftKeySelector = joinTranslator.getLeftKeySelector
+    val rightKeySelector = joinTranslator.getRightKeySelector
+
+    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    val ret = new TwoInputTransformation[RowData, RowData, RowData](
+      leftTransform,
+      rightTransform,
+      getRelDetailedDescription,
+      joinOperator,
+      InternalTypeInfo.of(returnType),
+      leftTransform.getParallelism)
+
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+    
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+    ret
+  }
+
+  private def validateKeyTypes(): Unit = {
+    // at least one equality expression
+    val leftFields = left.getRowType.getFieldList
+    val rightFields = right.getRowType.getFieldList
+
+    getJoinInfo.pairs().toList.foreach(pair => {
+      val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+      val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+      // check if keys are compatible
+      if (leftKeyType != rightKeyType) {
+        throw new TableException(
+          "Equality join predicate on incompatible types.\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
+            s"\tCondition: (${RelExplainUtil.expressionToString(
+              getCondition, inputRowType, getExpressionString)})"
+        )
+      }
+    })
+  }
+}
+
+/**
+  * @param rightRowTimeAttributeInputReference is defined only for event time 
joins.
+  */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+  textualRepresentation: String,
+  config: TableConfig,
+  returnType: RowType,
+  leftInputType: RowType,
+  rightInputType: RowType,
+  joinInfo: JoinInfo,
+  rexBuilder: RexBuilder,
+  leftTimeAttributeInputReference: Int,
+  rightRowTimeAttributeInputReference: Option[Int],
+  remainingNonEquiJoinPredicates: RexNode,
+  isTemporalFunctionJoin: Boolean) {
+
+  val nonEquiJoinPredicates: Option[RexNode] = 
Some(remainingNonEquiJoinPredicates)
+
+  def getLeftKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.leftKeys.toIntArray,
+      InternalTypeInfo.of(leftInputType)
+    )
+  }
+
+  def getRightKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.rightKeys.toIntArray,
+      InternalTypeInfo.of(rightInputType)
+    )
+  }
+
+  def getJoinOperator(
+    joinType: JoinRelType,
+    returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData, 
RowData] = {
+
+    // input must not be nullable, because the runtime join function will make 
sure
+    // the code-generated function won't process null inputs
+    val ctx = CodeGeneratorContext(config)
+    val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+      .bindInput(leftInputType)
+      .bindSecondInput(rightInputType)
+
+    val body = if (nonEquiJoinPredicates.isEmpty) {
+      // only equality condition
+      "return true;"
+    } else {
+      val condition = 
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+      s"""
+         |${condition.code}
+         |return ${condition.resultTerm};
+         |""".stripMargin
+    }
+
+    val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+      ctx,
+      "ConditionFunction",
+      body)
+
+    createJoinOperator(config, joinType, generatedJoinCondition)
+  }
+
+  protected def createJoinOperator(
+    tableConfig: TableConfig,
+    joinType: JoinRelType,
+    generatedJoinCondition: GeneratedJoinCondition)
+  : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+    if (isTemporalFunctionJoin) {
+      if (joinType != JoinRelType.INNER) {
+        throw new ValidationException(
+          "Temporal table function join currently only support INNER JOIN and 
LEFT JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    } else {
+      if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+        throw new TableException(
+          "Temporal table join currently only support INNER JOIN and LEFT 
JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    }
+
+    val isLeftOuterJoin = joinType == JoinRelType.LEFT
+    val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+    val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+    if (rightRowTimeAttributeInputReference.isDefined) {
+      if (isTemporalFunctionJoin) {
+        new LegacyTemporalRowTimeJoinOperator(
+          InternalTypeInfo.of(leftInputType),
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          leftTimeAttributeInputReference,
+          rightRowTimeAttributeInputReference.get,
+          minRetentionTime,
+          maxRetentionTime)
+      } else {
+        throw new TableException("Event-time temporal join operator is not 
implemented yet.")
+      }
+    } else {
+      if (isTemporalFunctionJoin) {
+        new TemporalProcessTimeJoinOperator(
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          minRetentionTime,
+          maxRetentionTime,
+          isLeftOuterJoin)
+      } else {
+        // current Processing-time temporal join implements has been finished, 
but the version from
+        // versioned table may be uncompleted, it may mislead users in 
production environment.
+        // So, we disable this feature here, see FLINK-xx for more details.

Review comment:
       create the issue and fill up the issue id. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
   override protected def translateToPlanInternal(
     planner: StreamPlanner): Transformation[RowData] = {
 
-    throw new ValidationException("Physical node of temporal join does not 
supported yet.")
+    validateKeyTypes()
+
+    val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      planner.getTableConfig,
+      returnType,
+      leftRel,
+      rightRel,
+      getJoinInfo,
+      cluster.getRexBuilder)
+
+    val joinOperator = joinTranslator.getJoinOperator(joinType, 
returnType.getFieldNames)
+    val leftKeySelector = joinTranslator.getLeftKeySelector
+    val rightKeySelector = joinTranslator.getRightKeySelector
+
+    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    val ret = new TwoInputTransformation[RowData, RowData, RowData](
+      leftTransform,
+      rightTransform,
+      getRelDetailedDescription,
+      joinOperator,
+      InternalTypeInfo.of(returnType),
+      leftTransform.getParallelism)
+
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+    
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+    ret
+  }
+
+  private def validateKeyTypes(): Unit = {
+    // at least one equality expression
+    val leftFields = left.getRowType.getFieldList
+    val rightFields = right.getRowType.getFieldList
+
+    getJoinInfo.pairs().toList.foreach(pair => {
+      val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+      val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+      // check if keys are compatible
+      if (leftKeyType != rightKeyType) {
+        throw new TableException(
+          "Equality join predicate on incompatible types.\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
+            s"\tCondition: (${RelExplainUtil.expressionToString(
+              getCondition, inputRowType, getExpressionString)})"
+        )
+      }
+    })
+  }
+}
+
+/**
+  * @param rightRowTimeAttributeInputReference is defined only for event time 
joins.
+  */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+  textualRepresentation: String,
+  config: TableConfig,
+  returnType: RowType,
+  leftInputType: RowType,
+  rightInputType: RowType,
+  joinInfo: JoinInfo,
+  rexBuilder: RexBuilder,
+  leftTimeAttributeInputReference: Int,
+  rightRowTimeAttributeInputReference: Option[Int],
+  remainingNonEquiJoinPredicates: RexNode,
+  isTemporalFunctionJoin: Boolean) {
+
+  val nonEquiJoinPredicates: Option[RexNode] = 
Some(remainingNonEquiJoinPredicates)
+
+  def getLeftKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.leftKeys.toIntArray,
+      InternalTypeInfo.of(leftInputType)
+    )
+  }
+
+  def getRightKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.rightKeys.toIntArray,
+      InternalTypeInfo.of(rightInputType)
+    )
+  }
+
+  def getJoinOperator(
+    joinType: JoinRelType,
+    returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData, 
RowData] = {
+
+    // input must not be nullable, because the runtime join function will make 
sure
+    // the code-generated function won't process null inputs
+    val ctx = CodeGeneratorContext(config)
+    val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+      .bindInput(leftInputType)
+      .bindSecondInput(rightInputType)
+
+    val body = if (nonEquiJoinPredicates.isEmpty) {
+      // only equality condition
+      "return true;"
+    } else {
+      val condition = 
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+      s"""
+         |${condition.code}
+         |return ${condition.resultTerm};
+         |""".stripMargin
+    }
+
+    val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+      ctx,
+      "ConditionFunction",
+      body)
+
+    createJoinOperator(config, joinType, generatedJoinCondition)
+  }
+
+  protected def createJoinOperator(
+    tableConfig: TableConfig,
+    joinType: JoinRelType,
+    generatedJoinCondition: GeneratedJoinCondition)
+  : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+    if (isTemporalFunctionJoin) {
+      if (joinType != JoinRelType.INNER) {
+        throw new ValidationException(
+          "Temporal table function join currently only support INNER JOIN and 
LEFT JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    } else {
+      if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+        throw new TableException(
+          "Temporal table join currently only support INNER JOIN and LEFT 
JOIN, " +
+            "but was " + joinType.toString + " JOIN")

Review comment:
       ```suggestion
               "but was " + joinType.toString + " JOIN.")
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,25 +18,42 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import java.util
+
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.{TableConfig, TableException, 
ValidationException}
 import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, 
StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
-
+import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
 TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor, 
KeySelectorUtil, RelExplainUtil, TemporalJoinUtil}
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector
+import 
org.apache.flink.table.runtime.operators.join.temporal.{LegacyTemporalRowTimeJoinOperator,
 TemporalProcessTimeJoinOperator}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions.checkState
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
 import org.apache.calcite.rex._
-
-import java.util
+import 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.{isProctimeIndicatorType,
 isRowtimeIndicatorType}

Review comment:
       Reorder imports.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,25 +18,42 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import java.util
+
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.{TableConfig, TableException, 
ValidationException}
 import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, 
StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
-
+import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
 TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor, 
KeySelectorUtil, RelExplainUtil, TemporalJoinUtil}
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector
+import 
org.apache.flink.table.runtime.operators.join.temporal.{LegacyTemporalRowTimeJoinOperator,
 TemporalProcessTimeJoinOperator}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions.checkState
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
 import org.apache.calcite.rex._
-
-import java.util
+import 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.{isProctimeIndicatorType,
 isRowtimeIndicatorType}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
- * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF).
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime).

Review comment:
       ```suggestion
    * temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)).
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
   override protected def translateToPlanInternal(
     planner: StreamPlanner): Transformation[RowData] = {
 
-    throw new ValidationException("Physical node of temporal join does not 
supported yet.")
+    validateKeyTypes()
+
+    val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      planner.getTableConfig,
+      returnType,
+      leftRel,
+      rightRel,
+      getJoinInfo,
+      cluster.getRexBuilder)
+
+    val joinOperator = joinTranslator.getJoinOperator(joinType, 
returnType.getFieldNames)
+    val leftKeySelector = joinTranslator.getLeftKeySelector
+    val rightKeySelector = joinTranslator.getRightKeySelector
+
+    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    val ret = new TwoInputTransformation[RowData, RowData, RowData](
+      leftTransform,
+      rightTransform,
+      getRelDetailedDescription,
+      joinOperator,
+      InternalTypeInfo.of(returnType),
+      leftTransform.getParallelism)
+
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+    
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+    ret
+  }
+
+  private def validateKeyTypes(): Unit = {
+    // at least one equality expression
+    val leftFields = left.getRowType.getFieldList
+    val rightFields = right.getRowType.getFieldList
+
+    getJoinInfo.pairs().toList.foreach(pair => {
+      val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+      val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+      // check if keys are compatible
+      if (leftKeyType != rightKeyType) {
+        throw new TableException(
+          "Equality join predicate on incompatible types.\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
+            s"\tCondition: (${RelExplainUtil.expressionToString(
+              getCondition, inputRowType, getExpressionString)})"
+        )
+      }
+    })
+  }
+}
+
+/**
+  * @param rightRowTimeAttributeInputReference is defined only for event time 
joins.
+  */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+  textualRepresentation: String,
+  config: TableConfig,
+  returnType: RowType,
+  leftInputType: RowType,
+  rightInputType: RowType,
+  joinInfo: JoinInfo,
+  rexBuilder: RexBuilder,
+  leftTimeAttributeInputReference: Int,
+  rightRowTimeAttributeInputReference: Option[Int],
+  remainingNonEquiJoinPredicates: RexNode,
+  isTemporalFunctionJoin: Boolean) {
+
+  val nonEquiJoinPredicates: Option[RexNode] = 
Some(remainingNonEquiJoinPredicates)
+
+  def getLeftKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.leftKeys.toIntArray,
+      InternalTypeInfo.of(leftInputType)
+    )
+  }
+
+  def getRightKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.rightKeys.toIntArray,
+      InternalTypeInfo.of(rightInputType)
+    )
+  }
+
+  def getJoinOperator(
+    joinType: JoinRelType,
+    returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData, 
RowData] = {
+
+    // input must not be nullable, because the runtime join function will make 
sure
+    // the code-generated function won't process null inputs
+    val ctx = CodeGeneratorContext(config)
+    val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+      .bindInput(leftInputType)
+      .bindSecondInput(rightInputType)
+
+    val body = if (nonEquiJoinPredicates.isEmpty) {
+      // only equality condition
+      "return true;"
+    } else {
+      val condition = 
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+      s"""
+         |${condition.code}
+         |return ${condition.resultTerm};
+         |""".stripMargin
+    }
+
+    val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+      ctx,
+      "ConditionFunction",
+      body)
+
+    createJoinOperator(config, joinType, generatedJoinCondition)
+  }
+
+  protected def createJoinOperator(
+    tableConfig: TableConfig,
+    joinType: JoinRelType,
+    generatedJoinCondition: GeneratedJoinCondition)
+  : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+    if (isTemporalFunctionJoin) {
+      if (joinType != JoinRelType.INNER) {
+        throw new ValidationException(
+          "Temporal table function join currently only support INNER JOIN and 
LEFT JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    } else {
+      if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+        throw new TableException(
+          "Temporal table join currently only support INNER JOIN and LEFT 
JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    }
+
+    val isLeftOuterJoin = joinType == JoinRelType.LEFT
+    val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+    val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+    if (rightRowTimeAttributeInputReference.isDefined) {
+      if (isTemporalFunctionJoin) {
+        new LegacyTemporalRowTimeJoinOperator(
+          InternalTypeInfo.of(leftInputType),
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          leftTimeAttributeInputReference,
+          rightRowTimeAttributeInputReference.get,
+          minRetentionTime,
+          maxRetentionTime)
+      } else {
+        throw new TableException("Event-time temporal join operator is not 
implemented yet.")
+      }
+    } else {
+      if (isTemporalFunctionJoin) {
+        new TemporalProcessTimeJoinOperator(
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          minRetentionTime,
+          maxRetentionTime,
+          isLeftOuterJoin)
+      } else {
+        // current Processing-time temporal join implements has been finished, 
but the version from
+        // versioned table may be uncompleted, it may mislead users in 
production environment.
+        // So, we disable this feature here, see FLINK-xx for more details.
+        throw new TableException("Processing-time temporal join is not 
supported yet.")
+      }
+    }
+  }
+}
+
+object StreamExecTemporalJoinToCoProcessTranslator {
+  def create(
+    textualRepresentation: String,
+    config: TableConfig,
+    returnType: RowType,
+    leftInput: RelNode,
+    rightInput: RelNode,
+    joinInfo: JoinInfo,
+    rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
+
+    val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
+    val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
+    val isTemporalFunctionJoin = 
TemporalJoinUtil.isTemporalFunctionJoin(rexBuilder, joinInfo)
+
+    checkState(
+      !joinInfo.isEqui,
+      "Missing %s in temporal join condition",
+      TEMPORAL_JOIN_CONDITION)
+
+    val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+      textualRepresentation,
+      leftType.getFieldCount,
+      joinInfo,
+      rexBuilder,
+      isTemporalFunctionJoin)
+
+    val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+    val remainingNonEquiJoinPredicates = 
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
+
+    val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef: 
Option[Int]) =
+      if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+        checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined 
&&
+          temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+          "Missing %s in Event-Time temporal join condition", 
TEMPORAL_JOIN_CONDITION)
+
+        val leftTimeAttributeInputRef = extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, 
textualRepresentation)
+        val rightTimeAttributeInputRef = extractInputRef(
+          temporalJoinConditionExtractor.rightTimeAttribute.get, 
textualRepresentation)
+        val rightInputRef = rightTimeAttributeInputRef - leftType.getFieldCount
+
+        (leftTimeAttributeInputRef, Some(rightInputRef))
+      } else {
+        val leftTimeAttributeInputRef = extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, 
textualRepresentation)
+        // right time attribute defined in temporal join condition iff in 
Event time join
+        (leftTimeAttributeInputRef, None)
+      }
+
+    new StreamExecTemporalJoinToCoProcessTranslator(
+      textualRepresentation,
+      config,
+      returnType,
+      leftType,
+      rightType,
+      joinInfo,
+      rexBuilder,
+      leftTimeAttributeInputRef,
+      rightRowTimeAttributeInputRef,
+      remainingNonEquiJoinPredicates,
+      isTemporalFunctionJoin)
+  }
+
+  private def extractInputRef(rexNode: RexNode, textualRepresentation: 
String): Int = {
+    val inputReferenceVisitor = new InputRefVisitor
+    rexNode.accept(inputReferenceVisitor)
+    checkState(
+      inputReferenceVisitor.getFields.length == 1,
+      "Failed to find input reference in [%s]",
+      textualRepresentation)
+    inputReferenceVisitor.getFields.head
+  }
+
+  private class TemporalJoinConditionExtractor(
+    textualRepresentation: String,
+    rightKeysStartingOffset: Int,
+    joinInfo: JoinInfo,
+    rexBuilder: RexBuilder,
+    isTemporalFunctionJoin: Boolean) extends RexShuttle {
+
+    var leftTimeAttribute: Option[RexNode] = None
+
+    var rightTimeAttribute: Option[RexNode] = None
+
+    var rightPrimaryKey: Option[Array[RexNode]] = None
+
+    override def visitCall(call: RexCall): RexNode = {
+      if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+        return super.visitCall(call)
+      }
+      // the condition of temporal function comes from WHERE clause,
+      // so it's not been validated in logical plan
+      if (isTemporalFunctionJoin) {
+        validateAndExtractTemporalFunctionCondition(call)

Review comment:
       I would suggest to move the special validation logic into 
`TemporalJoinUtil` and unify the time attribute & primary key extraction for 
both legacy and new temporal join. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -98,9 +111,8 @@ class TemporalJoinRewriteWithUniqueKeyRule extends 
RelOptRule(
       rightPrimaryKeyInputRefs: Option[Seq[RexNode]]): Unit = {
 
     if (rightPrimaryKeyInputRefs.isEmpty) {
-      throw new ValidationException("Event-Time Temporal Table Join requires 
both" +
-        s" primary key and row time attribute in versioned table," +
-        s" but no primary key can be found.")
+      throw new ValidationException("Temporal Table Join requires" +
+        s" primary key in versioned table, but no primary key can be found.")

Review comment:
       Would be better to print the plan tree which is helpful for debugging.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
   override protected def translateToPlanInternal(
     planner: StreamPlanner): Transformation[RowData] = {
 
-    throw new ValidationException("Physical node of temporal join does not 
supported yet.")
+    validateKeyTypes()
+
+    val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+    val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+      this.toString,
+      planner.getTableConfig,
+      returnType,
+      leftRel,
+      rightRel,
+      getJoinInfo,
+      cluster.getRexBuilder)
+
+    val joinOperator = joinTranslator.getJoinOperator(joinType, 
returnType.getFieldNames)
+    val leftKeySelector = joinTranslator.getLeftKeySelector
+    val rightKeySelector = joinTranslator.getRightKeySelector
+
+    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    val ret = new TwoInputTransformation[RowData, RowData, RowData](
+      leftTransform,
+      rightTransform,
+      getRelDetailedDescription,
+      joinOperator,
+      InternalTypeInfo.of(returnType),
+      leftTransform.getParallelism)
+
+    if (inputsContainSingleton()) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+    
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+    ret
+  }
+
+  private def validateKeyTypes(): Unit = {
+    // at least one equality expression
+    val leftFields = left.getRowType.getFieldList
+    val rightFields = right.getRowType.getFieldList
+
+    getJoinInfo.pairs().toList.foreach(pair => {
+      val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+      val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+      // check if keys are compatible
+      if (leftKeyType != rightKeyType) {
+        throw new TableException(
+          "Equality join predicate on incompatible types.\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
+            s"\tCondition: (${RelExplainUtil.expressionToString(
+              getCondition, inputRowType, getExpressionString)})"
+        )
+      }
+    })
+  }
+}
+
+/**
+  * @param rightRowTimeAttributeInputReference is defined only for event time 
joins.
+  */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+  textualRepresentation: String,
+  config: TableConfig,
+  returnType: RowType,
+  leftInputType: RowType,
+  rightInputType: RowType,
+  joinInfo: JoinInfo,
+  rexBuilder: RexBuilder,
+  leftTimeAttributeInputReference: Int,
+  rightRowTimeAttributeInputReference: Option[Int],
+  remainingNonEquiJoinPredicates: RexNode,
+  isTemporalFunctionJoin: Boolean) {
+
+  val nonEquiJoinPredicates: Option[RexNode] = 
Some(remainingNonEquiJoinPredicates)
+
+  def getLeftKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.leftKeys.toIntArray,
+      InternalTypeInfo.of(leftInputType)
+    )
+  }
+
+  def getRightKeySelector: RowDataKeySelector = {
+    KeySelectorUtil.getRowDataSelector(
+      joinInfo.rightKeys.toIntArray,
+      InternalTypeInfo.of(rightInputType)
+    )
+  }
+
+  def getJoinOperator(
+    joinType: JoinRelType,
+    returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData, 
RowData] = {
+
+    // input must not be nullable, because the runtime join function will make 
sure
+    // the code-generated function won't process null inputs
+    val ctx = CodeGeneratorContext(config)
+    val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+      .bindInput(leftInputType)
+      .bindSecondInput(rightInputType)
+
+    val body = if (nonEquiJoinPredicates.isEmpty) {
+      // only equality condition
+      "return true;"
+    } else {
+      val condition = 
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+      s"""
+         |${condition.code}
+         |return ${condition.resultTerm};
+         |""".stripMargin
+    }
+
+    val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+      ctx,
+      "ConditionFunction",
+      body)
+
+    createJoinOperator(config, joinType, generatedJoinCondition)
+  }
+
+  protected def createJoinOperator(
+    tableConfig: TableConfig,
+    joinType: JoinRelType,
+    generatedJoinCondition: GeneratedJoinCondition)
+  : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+    if (isTemporalFunctionJoin) {
+      if (joinType != JoinRelType.INNER) {
+        throw new ValidationException(
+          "Temporal table function join currently only support INNER JOIN and 
LEFT JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    } else {
+      if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+        throw new TableException(
+          "Temporal table join currently only support INNER JOIN and LEFT 
JOIN, " +
+            "but was " + joinType.toString + " JOIN")
+      }
+    }
+
+    val isLeftOuterJoin = joinType == JoinRelType.LEFT
+    val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+    val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+    if (rightRowTimeAttributeInputReference.isDefined) {
+      if (isTemporalFunctionJoin) {
+        new LegacyTemporalRowTimeJoinOperator(
+          InternalTypeInfo.of(leftInputType),
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          leftTimeAttributeInputReference,
+          rightRowTimeAttributeInputReference.get,
+          minRetentionTime,
+          maxRetentionTime)
+      } else {
+        throw new TableException("Event-time temporal join operator is not 
implemented yet.")
+      }
+    } else {
+      if (isTemporalFunctionJoin) {
+        new TemporalProcessTimeJoinOperator(
+          InternalTypeInfo.of(rightInputType),
+          generatedJoinCondition,
+          minRetentionTime,
+          maxRetentionTime,
+          isLeftOuterJoin)
+      } else {
+        // current Processing-time temporal join implements has been finished, 
but the version from
+        // versioned table may be uncompleted, it may mislead users in 
production environment.
+        // So, we disable this feature here, see FLINK-xx for more details.
+        throw new TableException("Processing-time temporal join is not 
supported yet.")
+      }
+    }
+  }
+}
+
+object StreamExecTemporalJoinToCoProcessTranslator {
+  def create(
+    textualRepresentation: String,
+    config: TableConfig,
+    returnType: RowType,
+    leftInput: RelNode,
+    rightInput: RelNode,
+    joinInfo: JoinInfo,
+    rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
+
+    val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
+    val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
+    val isTemporalFunctionJoin = 
TemporalJoinUtil.isTemporalFunctionJoin(rexBuilder, joinInfo)
+
+    checkState(
+      !joinInfo.isEqui,
+      "Missing %s in temporal join condition",
+      TEMPORAL_JOIN_CONDITION)
+
+    val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+      textualRepresentation,
+      leftType.getFieldCount,
+      joinInfo,
+      rexBuilder,
+      isTemporalFunctionJoin)
+
+    val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+    val remainingNonEquiJoinPredicates = 
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
+
+    val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef: 
Option[Int]) =
+      if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+        checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined 
&&
+          temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+          "Missing %s in Event-Time temporal join condition", 
TEMPORAL_JOIN_CONDITION)
+
+        val leftTimeAttributeInputRef = extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, 
textualRepresentation)
+        val rightTimeAttributeInputRef = extractInputRef(
+          temporalJoinConditionExtractor.rightTimeAttribute.get, 
textualRepresentation)
+        val rightInputRef = rightTimeAttributeInputRef - leftType.getFieldCount
+
+        (leftTimeAttributeInputRef, Some(rightInputRef))
+      } else {
+        val leftTimeAttributeInputRef = extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, 
textualRepresentation)
+        // right time attribute defined in temporal join condition iff in 
Event time join
+        (leftTimeAttributeInputRef, None)
+      }
+
+    new StreamExecTemporalJoinToCoProcessTranslator(
+      textualRepresentation,
+      config,
+      returnType,
+      leftType,
+      rightType,
+      joinInfo,
+      rexBuilder,
+      leftTimeAttributeInputRef,
+      rightRowTimeAttributeInputRef,
+      remainingNonEquiJoinPredicates,
+      isTemporalFunctionJoin)
+  }
+
+  private def extractInputRef(rexNode: RexNode, textualRepresentation: 
String): Int = {
+    val inputReferenceVisitor = new InputRefVisitor
+    rexNode.accept(inputReferenceVisitor)
+    checkState(
+      inputReferenceVisitor.getFields.length == 1,
+      "Failed to find input reference in [%s]",
+      textualRepresentation)
+    inputReferenceVisitor.getFields.head
+  }
+
+  private class TemporalJoinConditionExtractor(
+    textualRepresentation: String,
+    rightKeysStartingOffset: Int,
+    joinInfo: JoinInfo,
+    rexBuilder: RexBuilder,
+    isTemporalFunctionJoin: Boolean) extends RexShuttle {
+
+    var leftTimeAttribute: Option[RexNode] = None
+
+    var rightTimeAttribute: Option[RexNode] = None
+
+    var rightPrimaryKey: Option[Array[RexNode]] = None
+
+    override def visitCall(call: RexCall): RexNode = {
+      if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+        return super.visitCall(call)
+      }
+      // the condition of temporal function comes from WHERE clause,
+      // so it's not been validated in logical plan
+      if (isTemporalFunctionJoin) {
+        validateAndExtractTemporalFunctionCondition(call)
+      }
+      // temporal table join has been validated in logical plan, only do 
extract here
+      else {
+        if (TemporalJoinUtil.isRowTimeTemporalTableJoinCon(call)) {
+          leftTimeAttribute = Some(call.getOperands.get(0))
+          rightTimeAttribute = Some(call.getOperands.get(1))
+          rightPrimaryKey = 
Some(extractPrimaryKeyArray(call.getOperands.get(4)))

Review comment:
       We can put the primary key at the 2nd position, then the extraction 
logic should be the same with legacy temporal join. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to