wuchong commented on a change in pull request #13300:
URL: https://github.com/apache/flink/pull/13300#discussion_r512554312
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -116,8 +128,8 @@ class TemporalJoinRewriteWithUniqueKeyRule extends
RelOptRule(
if (!primaryKeyContainedInJoinKey) {
throw new ValidationException(
- s"Join key must be the same as temporal table's primary key " +
- s"in Event-time temporal table join.")
+ s"Join key must contains temporal table's primary key " +
+ s"in temporal table join.")
Review comment:
Would be better to print what't the join key and primary key of temporal
table. This would be useful for debugging.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -69,19 +68,33 @@ class TemporalJoinRewriteWithUniqueKeyRule extends
RelOptRule(
val newJoinCondition = joinCondition.accept(new RexShuttle {
override def visitCall(call: RexCall): RexNode = {
- if (call.getOperator == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION &&
- isRowTimeTemporalTableJoin(snapshot)) {
- val snapshotTimeInputRef = call.operands(0)
- val rightTimeInputRef = call.operands(1)
- val leftJoinKey = call.operands(2).asInstanceOf[RexCall].operands
- val rightJoinKey = call.operands(3).asInstanceOf[RexCall].operands
+ if (call.getOperator ==
TemporalJoinUtil.INITIAL_TEMPORAL_JOIN_CONDITION) {
+ val (snapshotTimeInputRef, leftJoinKey, rightJoinKey) =
+ TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call) match {
+ case true =>
Review comment:
I think it would be better to use `if else` branch for conditions
instead of match pattern.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode,
+ isTemporalFunctionJoin: Boolean) {
+
+ val nonEquiJoinPredicates: Option[RexNode] =
Some(remainingNonEquiJoinPredicates)
+
+ def getLeftKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.leftKeys.toIntArray,
+ InternalTypeInfo.of(leftInputType)
+ )
+ }
+
+ def getRightKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.rightKeys.toIntArray,
+ InternalTypeInfo.of(rightInputType)
+ )
+ }
+
+ def getJoinOperator(
+ joinType: JoinRelType,
+ returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData,
RowData] = {
+
+ // input must not be nullable, because the runtime join function will make
sure
+ // the code-generated function won't process null inputs
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+ .bindInput(leftInputType)
+ .bindSecondInput(rightInputType)
+
+ val body = if (nonEquiJoinPredicates.isEmpty) {
+ // only equality condition
+ "return true;"
+ } else {
+ val condition =
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+ s"""
+ |${condition.code}
+ |return ${condition.resultTerm};
+ |""".stripMargin
+ }
+
+ val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+ ctx,
+ "ConditionFunction",
+ body)
+
+ createJoinOperator(config, joinType, generatedJoinCondition)
+ }
+
+ protected def createJoinOperator(
+ tableConfig: TableConfig,
+ joinType: JoinRelType,
+ generatedJoinCondition: GeneratedJoinCondition)
+ : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+ if (isTemporalFunctionJoin) {
+ if (joinType != JoinRelType.INNER) {
+ throw new ValidationException(
+ "Temporal table function join currently only support INNER JOIN and
LEFT JOIN, " +
+ "but was " + joinType.toString + " JOIN")
Review comment:
```suggestion
"Temporal table function join currently only support INNER JOIN, "
+
"but was " + joinType.toString + " JOIN.")
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
##########
@@ -186,7 +280,8 @@ object TemporalJoinUtil {
var rowtimeJoin: Boolean = false
val visitor = new RexVisitorImpl[Unit](true) {
override def visitCall(call: RexCall): Unit = {
- if (isRowTimeTemporalJoinConditionCall(call)) {
+ if (isRowTimeTemporalTableJoinCon(call) ||
+ isRowTimeTemporalFunctionJoinCon(call)) {
Review comment:
Indent.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode,
+ isTemporalFunctionJoin: Boolean) {
Review comment:
Add one more indent for constructor parameters.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
##########
@@ -287,10 +287,10 @@ abstract class
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
s" but no row time attribute can be found.")
}
// Deal primary key in TemporalJoinRewriteUniqueKeyRule
- TemporalJoinUtil.makeRowTimeTemporalJoinConditionCall(rexBuilder,
snapshotTimeInputRef,
+ TemporalJoinUtil.makeInitialRowTimeTemporalTableJoinCondCall(rexBuilder,
snapshotTimeInputRef,
Review comment:
Split parameters into separate lines.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode,
+ isTemporalFunctionJoin: Boolean) {
+
+ val nonEquiJoinPredicates: Option[RexNode] =
Some(remainingNonEquiJoinPredicates)
+
+ def getLeftKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.leftKeys.toIntArray,
+ InternalTypeInfo.of(leftInputType)
+ )
+ }
+
+ def getRightKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.rightKeys.toIntArray,
+ InternalTypeInfo.of(rightInputType)
+ )
+ }
+
+ def getJoinOperator(
+ joinType: JoinRelType,
+ returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData,
RowData] = {
+
+ // input must not be nullable, because the runtime join function will make
sure
+ // the code-generated function won't process null inputs
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+ .bindInput(leftInputType)
+ .bindSecondInput(rightInputType)
+
+ val body = if (nonEquiJoinPredicates.isEmpty) {
+ // only equality condition
+ "return true;"
+ } else {
+ val condition =
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+ s"""
+ |${condition.code}
+ |return ${condition.resultTerm};
+ |""".stripMargin
+ }
+
+ val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+ ctx,
+ "ConditionFunction",
+ body)
+
+ createJoinOperator(config, joinType, generatedJoinCondition)
+ }
+
+ protected def createJoinOperator(
+ tableConfig: TableConfig,
+ joinType: JoinRelType,
+ generatedJoinCondition: GeneratedJoinCondition)
+ : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+ if (isTemporalFunctionJoin) {
+ if (joinType != JoinRelType.INNER) {
+ throw new ValidationException(
+ "Temporal table function join currently only support INNER JOIN and
LEFT JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ } else {
+ if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+ throw new TableException(
+ "Temporal table join currently only support INNER JOIN and LEFT
JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ }
+
+ val isLeftOuterJoin = joinType == JoinRelType.LEFT
+ val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+ val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+ if (rightRowTimeAttributeInputReference.isDefined) {
+ if (isTemporalFunctionJoin) {
+ new LegacyTemporalRowTimeJoinOperator(
+ InternalTypeInfo.of(leftInputType),
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ leftTimeAttributeInputReference,
+ rightRowTimeAttributeInputReference.get,
+ minRetentionTime,
+ maxRetentionTime)
+ } else {
+ throw new TableException("Event-time temporal join operator is not
implemented yet.")
+ }
+ } else {
+ if (isTemporalFunctionJoin) {
+ new TemporalProcessTimeJoinOperator(
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ minRetentionTime,
+ maxRetentionTime,
+ isLeftOuterJoin)
+ } else {
+ // current Processing-time temporal join implements has been finished,
but the version from
Review comment:
> The exsiting TemporalProcessTimeJoinOperator has already supported
temporal table join.
> However, the semantic of this implementation is problematic, because the
join processing for left stream doesn't wait for the complete snapshot of
temporal table, this may mislead users in production environment.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,25 +18,42 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import java.util
+
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.{TableConfig, TableException,
ValidationException}
import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
ExprCodeGenerator, FunctionCodeGenerator}
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode,
StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
-
+import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor,
KeySelectorUtil, RelExplainUtil, TemporalJoinUtil}
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector
+import
org.apache.flink.table.runtime.operators.join.temporal.{LegacyTemporalRowTimeJoinOperator,
TemporalProcessTimeJoinOperator}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions.checkState
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
import org.apache.calcite.rex._
-
-import java.util
+import
org.apache.flink.table.planner.calcite.FlinkTypeFactory.{isProctimeIndicatorType,
isRowtimeIndicatorType}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
- * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF).
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table
join,
+ * the only difference is the validation, We reuse most same logic here;
Review comment:
```suggestion
* the only difference is the validation, we reuse most same logic here.
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode,
+ isTemporalFunctionJoin: Boolean) {
+
+ val nonEquiJoinPredicates: Option[RexNode] =
Some(remainingNonEquiJoinPredicates)
+
+ def getLeftKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.leftKeys.toIntArray,
+ InternalTypeInfo.of(leftInputType)
+ )
+ }
+
+ def getRightKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.rightKeys.toIntArray,
+ InternalTypeInfo.of(rightInputType)
+ )
+ }
+
+ def getJoinOperator(
+ joinType: JoinRelType,
+ returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData,
RowData] = {
+
+ // input must not be nullable, because the runtime join function will make
sure
+ // the code-generated function won't process null inputs
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+ .bindInput(leftInputType)
+ .bindSecondInput(rightInputType)
+
+ val body = if (nonEquiJoinPredicates.isEmpty) {
+ // only equality condition
+ "return true;"
+ } else {
+ val condition =
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+ s"""
+ |${condition.code}
+ |return ${condition.resultTerm};
+ |""".stripMargin
+ }
+
+ val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+ ctx,
+ "ConditionFunction",
+ body)
+
+ createJoinOperator(config, joinType, generatedJoinCondition)
+ }
+
+ protected def createJoinOperator(
+ tableConfig: TableConfig,
+ joinType: JoinRelType,
+ generatedJoinCondition: GeneratedJoinCondition)
+ : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+ if (isTemporalFunctionJoin) {
+ if (joinType != JoinRelType.INNER) {
+ throw new ValidationException(
+ "Temporal table function join currently only support INNER JOIN and
LEFT JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ } else {
+ if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+ throw new TableException(
+ "Temporal table join currently only support INNER JOIN and LEFT
JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ }
+
+ val isLeftOuterJoin = joinType == JoinRelType.LEFT
+ val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+ val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+ if (rightRowTimeAttributeInputReference.isDefined) {
+ if (isTemporalFunctionJoin) {
+ new LegacyTemporalRowTimeJoinOperator(
+ InternalTypeInfo.of(leftInputType),
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ leftTimeAttributeInputReference,
+ rightRowTimeAttributeInputReference.get,
+ minRetentionTime,
+ maxRetentionTime)
+ } else {
+ throw new TableException("Event-time temporal join operator is not
implemented yet.")
+ }
+ } else {
+ if (isTemporalFunctionJoin) {
+ new TemporalProcessTimeJoinOperator(
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ minRetentionTime,
+ maxRetentionTime,
+ isLeftOuterJoin)
+ } else {
+ // current Processing-time temporal join implements has been finished,
but the version from
+ // versioned table may be uncompleted, it may mislead users in
production environment.
+ // So, we disable this feature here, see FLINK-xx for more details.
Review comment:
create the issue and fill up the issue id.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode,
+ isTemporalFunctionJoin: Boolean) {
+
+ val nonEquiJoinPredicates: Option[RexNode] =
Some(remainingNonEquiJoinPredicates)
+
+ def getLeftKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.leftKeys.toIntArray,
+ InternalTypeInfo.of(leftInputType)
+ )
+ }
+
+ def getRightKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.rightKeys.toIntArray,
+ InternalTypeInfo.of(rightInputType)
+ )
+ }
+
+ def getJoinOperator(
+ joinType: JoinRelType,
+ returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData,
RowData] = {
+
+ // input must not be nullable, because the runtime join function will make
sure
+ // the code-generated function won't process null inputs
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+ .bindInput(leftInputType)
+ .bindSecondInput(rightInputType)
+
+ val body = if (nonEquiJoinPredicates.isEmpty) {
+ // only equality condition
+ "return true;"
+ } else {
+ val condition =
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+ s"""
+ |${condition.code}
+ |return ${condition.resultTerm};
+ |""".stripMargin
+ }
+
+ val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+ ctx,
+ "ConditionFunction",
+ body)
+
+ createJoinOperator(config, joinType, generatedJoinCondition)
+ }
+
+ protected def createJoinOperator(
+ tableConfig: TableConfig,
+ joinType: JoinRelType,
+ generatedJoinCondition: GeneratedJoinCondition)
+ : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+ if (isTemporalFunctionJoin) {
+ if (joinType != JoinRelType.INNER) {
+ throw new ValidationException(
+ "Temporal table function join currently only support INNER JOIN and
LEFT JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ } else {
+ if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+ throw new TableException(
+ "Temporal table join currently only support INNER JOIN and LEFT
JOIN, " +
+ "but was " + joinType.toString + " JOIN")
Review comment:
```suggestion
"but was " + joinType.toString + " JOIN.")
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,25 +18,42 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import java.util
+
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.{TableConfig, TableException,
ValidationException}
import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
ExprCodeGenerator, FunctionCodeGenerator}
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode,
StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
-
+import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor,
KeySelectorUtil, RelExplainUtil, TemporalJoinUtil}
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector
+import
org.apache.flink.table.runtime.operators.join.temporal.{LegacyTemporalRowTimeJoinOperator,
TemporalProcessTimeJoinOperator}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions.checkState
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
import org.apache.calcite.rex._
-
-import java.util
+import
org.apache.flink.table.planner.calcite.FlinkTypeFactory.{isProctimeIndicatorType,
isRowtimeIndicatorType}
Review comment:
Reorder imports.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -18,25 +18,42 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import java.util
+
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
+import org.apache.flink.table.api.{TableConfig, TableException,
ValidationException}
import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
ExprCodeGenerator, FunctionCodeGenerator}
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode,
StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
-
+import
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION,
TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor,
KeySelectorUtil, RelExplainUtil, TemporalJoinUtil}
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector
+import
org.apache.flink.table.runtime.operators.join.temporal.{LegacyTemporalRowTimeJoinOperator,
TemporalProcessTimeJoinOperator}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+import org.apache.flink.util.Preconditions.checkState
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.core.{Join, JoinInfo, JoinRelType}
import org.apache.calcite.rex._
-
-import java.util
+import
org.apache.flink.table.planner.calcite.FlinkTypeFactory.{isProctimeIndicatorType,
isRowtimeIndicatorType}
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
- * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF).
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime).
Review comment:
```suggestion
* temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)).
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode,
+ isTemporalFunctionJoin: Boolean) {
+
+ val nonEquiJoinPredicates: Option[RexNode] =
Some(remainingNonEquiJoinPredicates)
+
+ def getLeftKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.leftKeys.toIntArray,
+ InternalTypeInfo.of(leftInputType)
+ )
+ }
+
+ def getRightKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.rightKeys.toIntArray,
+ InternalTypeInfo.of(rightInputType)
+ )
+ }
+
+ def getJoinOperator(
+ joinType: JoinRelType,
+ returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData,
RowData] = {
+
+ // input must not be nullable, because the runtime join function will make
sure
+ // the code-generated function won't process null inputs
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+ .bindInput(leftInputType)
+ .bindSecondInput(rightInputType)
+
+ val body = if (nonEquiJoinPredicates.isEmpty) {
+ // only equality condition
+ "return true;"
+ } else {
+ val condition =
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+ s"""
+ |${condition.code}
+ |return ${condition.resultTerm};
+ |""".stripMargin
+ }
+
+ val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+ ctx,
+ "ConditionFunction",
+ body)
+
+ createJoinOperator(config, joinType, generatedJoinCondition)
+ }
+
+ protected def createJoinOperator(
+ tableConfig: TableConfig,
+ joinType: JoinRelType,
+ generatedJoinCondition: GeneratedJoinCondition)
+ : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+ if (isTemporalFunctionJoin) {
+ if (joinType != JoinRelType.INNER) {
+ throw new ValidationException(
+ "Temporal table function join currently only support INNER JOIN and
LEFT JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ } else {
+ if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+ throw new TableException(
+ "Temporal table join currently only support INNER JOIN and LEFT
JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ }
+
+ val isLeftOuterJoin = joinType == JoinRelType.LEFT
+ val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+ val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+ if (rightRowTimeAttributeInputReference.isDefined) {
+ if (isTemporalFunctionJoin) {
+ new LegacyTemporalRowTimeJoinOperator(
+ InternalTypeInfo.of(leftInputType),
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ leftTimeAttributeInputReference,
+ rightRowTimeAttributeInputReference.get,
+ minRetentionTime,
+ maxRetentionTime)
+ } else {
+ throw new TableException("Event-time temporal join operator is not
implemented yet.")
+ }
+ } else {
+ if (isTemporalFunctionJoin) {
+ new TemporalProcessTimeJoinOperator(
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ minRetentionTime,
+ maxRetentionTime,
+ isLeftOuterJoin)
+ } else {
+ // current Processing-time temporal join implements has been finished,
but the version from
+ // versioned table may be uncompleted, it may mislead users in
production environment.
+ // So, we disable this feature here, see FLINK-xx for more details.
+ throw new TableException("Processing-time temporal join is not
supported yet.")
+ }
+ }
+ }
+}
+
+object StreamExecTemporalJoinToCoProcessTranslator {
+ def create(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInput: RelNode,
+ rightInput: RelNode,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
+
+ val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
+ val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
+ val isTemporalFunctionJoin =
TemporalJoinUtil.isTemporalFunctionJoin(rexBuilder, joinInfo)
+
+ checkState(
+ !joinInfo.isEqui,
+ "Missing %s in temporal join condition",
+ TEMPORAL_JOIN_CONDITION)
+
+ val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+ textualRepresentation,
+ leftType.getFieldCount,
+ joinInfo,
+ rexBuilder,
+ isTemporalFunctionJoin)
+
+ val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+ val remainingNonEquiJoinPredicates =
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
+
+ val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef:
Option[Int]) =
+ if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+ checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined
&&
+ temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+ "Missing %s in Event-Time temporal join condition",
TEMPORAL_JOIN_CONDITION)
+
+ val leftTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
textualRepresentation)
+ val rightTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.rightTimeAttribute.get,
textualRepresentation)
+ val rightInputRef = rightTimeAttributeInputRef - leftType.getFieldCount
+
+ (leftTimeAttributeInputRef, Some(rightInputRef))
+ } else {
+ val leftTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
textualRepresentation)
+ // right time attribute defined in temporal join condition iff in
Event time join
+ (leftTimeAttributeInputRef, None)
+ }
+
+ new StreamExecTemporalJoinToCoProcessTranslator(
+ textualRepresentation,
+ config,
+ returnType,
+ leftType,
+ rightType,
+ joinInfo,
+ rexBuilder,
+ leftTimeAttributeInputRef,
+ rightRowTimeAttributeInputRef,
+ remainingNonEquiJoinPredicates,
+ isTemporalFunctionJoin)
+ }
+
+ private def extractInputRef(rexNode: RexNode, textualRepresentation:
String): Int = {
+ val inputReferenceVisitor = new InputRefVisitor
+ rexNode.accept(inputReferenceVisitor)
+ checkState(
+ inputReferenceVisitor.getFields.length == 1,
+ "Failed to find input reference in [%s]",
+ textualRepresentation)
+ inputReferenceVisitor.getFields.head
+ }
+
+ private class TemporalJoinConditionExtractor(
+ textualRepresentation: String,
+ rightKeysStartingOffset: Int,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ isTemporalFunctionJoin: Boolean) extends RexShuttle {
+
+ var leftTimeAttribute: Option[RexNode] = None
+
+ var rightTimeAttribute: Option[RexNode] = None
+
+ var rightPrimaryKey: Option[Array[RexNode]] = None
+
+ override def visitCall(call: RexCall): RexNode = {
+ if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+ return super.visitCall(call)
+ }
+ // the condition of temporal function comes from WHERE clause,
+ // so it's not been validated in logical plan
+ if (isTemporalFunctionJoin) {
+ validateAndExtractTemporalFunctionCondition(call)
Review comment:
I would suggest to move the special validation logic into
`TemporalJoinUtil` and unify the time attribute & primary key extraction for
both legacy and new temporal join.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -98,9 +111,8 @@ class TemporalJoinRewriteWithUniqueKeyRule extends
RelOptRule(
rightPrimaryKeyInputRefs: Option[Seq[RexNode]]): Unit = {
if (rightPrimaryKeyInputRefs.isEmpty) {
- throw new ValidationException("Event-Time Temporal Table Join requires
both" +
- s" primary key and row time attribute in versioned table," +
- s" but no primary key can be found.")
+ throw new ValidationException("Temporal Table Join requires" +
+ s" primary key in versioned table, but no primary key can be found.")
Review comment:
Would be better to print the plan tree which is helpful for debugging.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
##########
@@ -84,6 +101,361 @@ class StreamExecTemporalJoin(
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
- throw new ValidationException("Physical node of temporal join does not
supported yet.")
+ validateKeyTypes()
+
+ val returnType = FlinkTypeFactory.toLogicalRowType(getRowType)
+
+ val joinTranslator = StreamExecTemporalJoinToCoProcessTranslator.create(
+ this.toString,
+ planner.getTableConfig,
+ returnType,
+ leftRel,
+ rightRel,
+ getJoinInfo,
+ cluster.getRexBuilder)
+
+ val joinOperator = joinTranslator.getJoinOperator(joinType,
returnType.getFieldNames)
+ val leftKeySelector = joinTranslator.getLeftKeySelector
+ val rightKeySelector = joinTranslator.getRightKeySelector
+
+ val leftTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+ val rightTransform = getInputNodes.get(1).translateToPlan(planner)
+ .asInstanceOf[Transformation[RowData]]
+
+ val ret = new TwoInputTransformation[RowData, RowData, RowData](
+ leftTransform,
+ rightTransform,
+ getRelDetailedDescription,
+ joinOperator,
+ InternalTypeInfo.of(returnType),
+ leftTransform.getParallelism)
+
+ if (inputsContainSingleton()) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelectors(leftKeySelector, rightKeySelector)
+
ret.setStateKeyType(leftKeySelector.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ ret
+ }
+
+ private def validateKeyTypes(): Unit = {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ getJoinInfo.pairs().toList.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+ // check if keys are compatible
+ if (leftKeyType != rightKeyType) {
+ throw new TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: $left,\n" +
+ s"\tRight: $right,\n" +
+ s"\tCondition: (${RelExplainUtil.expressionToString(
+ getCondition, inputRowType, getExpressionString)})"
+ )
+ }
+ })
+ }
+}
+
+/**
+ * @param rightRowTimeAttributeInputReference is defined only for event time
joins.
+ */
+class StreamExecTemporalJoinToCoProcessTranslator private(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInputType: RowType,
+ rightInputType: RowType,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ leftTimeAttributeInputReference: Int,
+ rightRowTimeAttributeInputReference: Option[Int],
+ remainingNonEquiJoinPredicates: RexNode,
+ isTemporalFunctionJoin: Boolean) {
+
+ val nonEquiJoinPredicates: Option[RexNode] =
Some(remainingNonEquiJoinPredicates)
+
+ def getLeftKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.leftKeys.toIntArray,
+ InternalTypeInfo.of(leftInputType)
+ )
+ }
+
+ def getRightKeySelector: RowDataKeySelector = {
+ KeySelectorUtil.getRowDataSelector(
+ joinInfo.rightKeys.toIntArray,
+ InternalTypeInfo.of(rightInputType)
+ )
+ }
+
+ def getJoinOperator(
+ joinType: JoinRelType,
+ returnFieldNames: Seq[String]): TwoInputStreamOperator[RowData, RowData,
RowData] = {
+
+ // input must not be nullable, because the runtime join function will make
sure
+ // the code-generated function won't process null inputs
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false)
+ .bindInput(leftInputType)
+ .bindSecondInput(rightInputType)
+
+ val body = if (nonEquiJoinPredicates.isEmpty) {
+ // only equality condition
+ "return true;"
+ } else {
+ val condition =
exprGenerator.generateExpression(nonEquiJoinPredicates.get)
+ s"""
+ |${condition.code}
+ |return ${condition.resultTerm};
+ |""".stripMargin
+ }
+
+ val generatedJoinCondition = FunctionCodeGenerator.generateJoinCondition(
+ ctx,
+ "ConditionFunction",
+ body)
+
+ createJoinOperator(config, joinType, generatedJoinCondition)
+ }
+
+ protected def createJoinOperator(
+ tableConfig: TableConfig,
+ joinType: JoinRelType,
+ generatedJoinCondition: GeneratedJoinCondition)
+ : TwoInputStreamOperator[RowData, RowData, RowData] = {
+
+ if (isTemporalFunctionJoin) {
+ if (joinType != JoinRelType.INNER) {
+ throw new ValidationException(
+ "Temporal table function join currently only support INNER JOIN and
LEFT JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ } else {
+ if (joinType != JoinRelType.LEFT && joinType != JoinRelType.INNER) {
+ throw new TableException(
+ "Temporal table join currently only support INNER JOIN and LEFT
JOIN, " +
+ "but was " + joinType.toString + " JOIN")
+ }
+ }
+
+ val isLeftOuterJoin = joinType == JoinRelType.LEFT
+ val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+ val maxRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+ if (rightRowTimeAttributeInputReference.isDefined) {
+ if (isTemporalFunctionJoin) {
+ new LegacyTemporalRowTimeJoinOperator(
+ InternalTypeInfo.of(leftInputType),
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ leftTimeAttributeInputReference,
+ rightRowTimeAttributeInputReference.get,
+ minRetentionTime,
+ maxRetentionTime)
+ } else {
+ throw new TableException("Event-time temporal join operator is not
implemented yet.")
+ }
+ } else {
+ if (isTemporalFunctionJoin) {
+ new TemporalProcessTimeJoinOperator(
+ InternalTypeInfo.of(rightInputType),
+ generatedJoinCondition,
+ minRetentionTime,
+ maxRetentionTime,
+ isLeftOuterJoin)
+ } else {
+ // current Processing-time temporal join implements has been finished,
but the version from
+ // versioned table may be uncompleted, it may mislead users in
production environment.
+ // So, we disable this feature here, see FLINK-xx for more details.
+ throw new TableException("Processing-time temporal join is not
supported yet.")
+ }
+ }
+ }
+}
+
+object StreamExecTemporalJoinToCoProcessTranslator {
+ def create(
+ textualRepresentation: String,
+ config: TableConfig,
+ returnType: RowType,
+ leftInput: RelNode,
+ rightInput: RelNode,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder): StreamExecTemporalJoinToCoProcessTranslator = {
+
+ val leftType = FlinkTypeFactory.toLogicalRowType(leftInput.getRowType)
+ val rightType = FlinkTypeFactory.toLogicalRowType(rightInput.getRowType)
+ val isTemporalFunctionJoin =
TemporalJoinUtil.isTemporalFunctionJoin(rexBuilder, joinInfo)
+
+ checkState(
+ !joinInfo.isEqui,
+ "Missing %s in temporal join condition",
+ TEMPORAL_JOIN_CONDITION)
+
+ val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+ textualRepresentation,
+ leftType.getFieldCount,
+ joinInfo,
+ rexBuilder,
+ isTemporalFunctionJoin)
+
+ val nonEquiJoinRex: RexNode = joinInfo.getRemaining(rexBuilder)
+ val remainingNonEquiJoinPredicates =
temporalJoinConditionExtractor.apply(nonEquiJoinRex)
+
+ val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef:
Option[Int]) =
+ if (TemporalJoinUtil.isRowTimeJoin(rexBuilder, joinInfo)) {
+ checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined
&&
+ temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+ "Missing %s in Event-Time temporal join condition",
TEMPORAL_JOIN_CONDITION)
+
+ val leftTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
textualRepresentation)
+ val rightTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.rightTimeAttribute.get,
textualRepresentation)
+ val rightInputRef = rightTimeAttributeInputRef - leftType.getFieldCount
+
+ (leftTimeAttributeInputRef, Some(rightInputRef))
+ } else {
+ val leftTimeAttributeInputRef = extractInputRef(
+ temporalJoinConditionExtractor.leftTimeAttribute.get,
textualRepresentation)
+ // right time attribute defined in temporal join condition iff in
Event time join
+ (leftTimeAttributeInputRef, None)
+ }
+
+ new StreamExecTemporalJoinToCoProcessTranslator(
+ textualRepresentation,
+ config,
+ returnType,
+ leftType,
+ rightType,
+ joinInfo,
+ rexBuilder,
+ leftTimeAttributeInputRef,
+ rightRowTimeAttributeInputRef,
+ remainingNonEquiJoinPredicates,
+ isTemporalFunctionJoin)
+ }
+
+ private def extractInputRef(rexNode: RexNode, textualRepresentation:
String): Int = {
+ val inputReferenceVisitor = new InputRefVisitor
+ rexNode.accept(inputReferenceVisitor)
+ checkState(
+ inputReferenceVisitor.getFields.length == 1,
+ "Failed to find input reference in [%s]",
+ textualRepresentation)
+ inputReferenceVisitor.getFields.head
+ }
+
+ private class TemporalJoinConditionExtractor(
+ textualRepresentation: String,
+ rightKeysStartingOffset: Int,
+ joinInfo: JoinInfo,
+ rexBuilder: RexBuilder,
+ isTemporalFunctionJoin: Boolean) extends RexShuttle {
+
+ var leftTimeAttribute: Option[RexNode] = None
+
+ var rightTimeAttribute: Option[RexNode] = None
+
+ var rightPrimaryKey: Option[Array[RexNode]] = None
+
+ override def visitCall(call: RexCall): RexNode = {
+ if (call.getOperator != TEMPORAL_JOIN_CONDITION) {
+ return super.visitCall(call)
+ }
+ // the condition of temporal function comes from WHERE clause,
+ // so it's not been validated in logical plan
+ if (isTemporalFunctionJoin) {
+ validateAndExtractTemporalFunctionCondition(call)
+ }
+ // temporal table join has been validated in logical plan, only do
extract here
+ else {
+ if (TemporalJoinUtil.isRowTimeTemporalTableJoinCon(call)) {
+ leftTimeAttribute = Some(call.getOperands.get(0))
+ rightTimeAttribute = Some(call.getOperands.get(1))
+ rightPrimaryKey =
Some(extractPrimaryKeyArray(call.getOperands.get(4)))
Review comment:
We can put the primary key at the 2nd position, then the extraction
logic should be the same with legacy temporal join.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]