This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 01cdc703ee6 [FLINK-24239] Event time temporal join should support
values from array, map, row, etc. as join key (#24253)
01cdc703ee6 is described below
commit 01cdc703ee6fa56bdfdf799d016c0e882e9e5d99
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 8 15:26:16 2024 +0100
[FLINK-24239] Event time temporal join should support values from array,
map, row, etc. as join key (#24253)
---
...gicalCorrelateToJoinFromTemporalTableRule.scala | 151 +++---
.../nodes/exec/stream/TemporalJoinRestoreTest.java | 2 +
.../exec/stream/TemporalJoinTestPrograms.java | 82 +++
.../temporal-join-table-join-key-from-map.json | 569 +++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 14977 bytes
.../plan/temporal-join-table-join-nested-key.json | 600 +++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 14973 bytes
7 files changed, 1336 insertions(+), 68 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 217b9597561..25f1d29d8ea 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -26,11 +26,12 @@ import
org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable, Table
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
import org.apache.flink.table.sources.LookupableTableSource
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand,
RelOptUtil}
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.hep.{HepPlanner, HepRelVertex}
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.core.{CorrelationId, TableScan}
import org.apache.calcite.rel.logical._
import org.apache.calcite.rex._
@@ -141,6 +142,30 @@ abstract class LogicalCorrelateToJoinFromTemporalTableRule(
}
case _ => false
}
+
+ protected def decorrelate(
+ rexNode: RexNode,
+ leftRowType: RelDataType,
+ correlationId: CorrelationId): RexNode = {
+ rexNode.accept(new RexShuttle() {
+ // change correlate variable expression to normal RexInputRef (which is
from left side)
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+ fieldAccess.getReferenceExpr match {
+ case corVar: RexCorrelVariable =>
+ require(correlationId.equals(corVar.id))
+ val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+ RexInputRef.of(index, leftRowType)
+ case _ => super.visitFieldAccess(fieldAccess)
+ }
+ }
+
+ // update the field index from right side
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+ new RexInputRef(rightIndex, inputRef.getType)
+ }
+ })
+ }
}
/**
@@ -161,24 +186,7 @@ abstract class
LogicalCorrelateToJoinFromLookupTemporalTableRule(
validateSnapshotInCorrelate(snapshot, correlate)
val leftRowType = leftInput.getRowType
- val joinCondition = filterCondition.accept(new RexShuttle() {
- // change correlate variable expression to normal RexInputRef (which is
from left side)
- override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
- fieldAccess.getReferenceExpr match {
- case corVar: RexCorrelVariable =>
- require(correlate.getCorrelationId.equals(corVar.id))
- val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
- RexInputRef.of(index, leftRowType)
- case _ => super.visitFieldAccess(fieldAccess)
- }
- }
-
- // update the field index from right side
- override def visitInputRef(inputRef: RexInputRef): RexNode = {
- val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
- new RexInputRef(rightIndex, inputRef.getType)
- }
- })
+ val joinCondition = decorrelate(filterCondition, leftRowType,
correlate.getCorrelationId)
val builder = call.builder()
builder.push(leftInput)
@@ -198,8 +206,8 @@ abstract class
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
protected def extractRightEventTimeInputRef(
leftInput: RelNode,
- snapshot: LogicalSnapshot): Option[RexNode] = {
- val rightFields = snapshot.getRowType.getFieldList.asScala
+ rightInput: RelNode): Option[RexNode] = {
+ val rightFields = rightInput.getRowType.getFieldList.asScala
val timeAttributeFields = rightFields.filter(
f =>
f.getType.isInstanceOf[TimeIndicatorRelDataType] &&
@@ -209,7 +217,7 @@ abstract class
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
val timeColIndex = leftInput.getRowType.getFieldCount +
rightFields.indexOf(timeAttributeFields.get(0))
val timeColDataType = timeAttributeFields.get(0).getType
- val rexBuilder = snapshot.getCluster.getRexBuilder
+ val rexBuilder = rightInput.getCluster.getRexBuilder
Some(rexBuilder.makeInputRef(timeColDataType, timeColIndex))
} else {
None
@@ -237,57 +245,32 @@ abstract class
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
val snapshot = getLogicalSnapshot(call)
val leftRowType = leftInput.getRowType
- val joinCondition = filterCondition.accept(new RexShuttle() {
- // change correlate variable expression to normal RexInputRef (which is
from left side)
- override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
- fieldAccess.getReferenceExpr match {
- case corVar: RexCorrelVariable =>
- require(correlate.getCorrelationId.equals(corVar.id))
- val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
- RexInputRef.of(index, leftRowType)
- case _ => super.visitFieldAccess(fieldAccess)
- }
- }
-
- // update the field index from right side
- override def visitInputRef(inputRef: RexInputRef): RexNode = {
- val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
- new RexInputRef(rightIndex, inputRef.getType)
- }
- })
+ val joinCondition = decorrelate(filterCondition, leftRowType,
correlate.getCorrelationId)
validateSnapshotInCorrelate(snapshot, correlate)
val rexBuilder = correlate.getCluster.getRexBuilder
- val (leftJoinKey, rightJoinKey) = {
- val relBuilder = call.builder()
- relBuilder.push(leftInput)
- relBuilder.push(snapshot)
- val rewriteJoin = relBuilder.join(correlate.getJoinType,
joinCondition).build()
- val joinInfo = rewriteJoin.asInstanceOf[LogicalJoin].analyzeCondition()
- val leftJoinKey = joinInfo.leftKeys.map(i =>
rexBuilder.makeInputRef(leftInput, i))
- val leftFieldCnt = leftInput.getRowType.getFieldCount
- val rightJoinKey = joinInfo.rightKeys.map(
- i => {
- val leftKeyType = snapshot.getRowType.getFieldList.get(i).getType
- rexBuilder.makeInputRef(leftKeyType, leftFieldCnt + i)
- })
- if (leftJoinKey.length == 0 || rightJoinKey.length == 0) {
- throw new ValidationException(
- "Currently the join key in Temporal Table Join " +
- "can not be empty.")
- }
- (leftJoinKey, rightJoinKey)
+ val relBuilder = call.builder()
+ relBuilder.push(leftInput)
+ relBuilder.push(snapshot)
+ val nonPushedJoin =
+ relBuilder.join(correlate.getJoinType,
joinCondition).build().asInstanceOf[LogicalJoin]
+ val rewriteJoin = RelOptUtil.pushDownJoinConditions(nonPushedJoin,
relBuilder)
+ val actualJoin = rewriteJoin match {
+ case _: LogicalJoin => rewriteJoin.asInstanceOf[LogicalJoin]
+ case _ => rewriteJoin.getInput(0).asInstanceOf[LogicalJoin]
}
- val snapshotTimeInputRef = extractSnapshotTimeInputRef(leftInput, snapshot)
+ val (leftJoinKey, rightJoinKey) = extractJoinKeys(actualJoin)
+
+ val snapshotTimeInputRef = extractSnapshotTimeInputRef(actualJoin.getLeft,
snapshot)
.getOrElse(
throw new ValidationException(
"Temporal Table Join requires time " +
"attribute in the left table, but no time attribute found."))
val temporalCondition = if (isRowTimeTemporalTableJoin(snapshot)) {
- val rightTimeInputRef = extractRightEventTimeInputRef(leftInput,
snapshot)
+ val rightTimeInputRef =
extractRightEventTimeInputRef(actualJoin.getLeft, actualJoin.getRight)
if (rightTimeInputRef.isEmpty ||
!isRowtimeIndicatorType(rightTimeInputRef.get.getType)) {
throw new ValidationException(
"Event-Time Temporal Table Join requires both" +
@@ -323,15 +306,47 @@ abstract class
LogicalCorrelateToJoinFromGeneralTemporalTableRule(
}
val builder = call.builder()
- val condition = builder.and(joinCondition, temporalCondition)
-
- builder.push(leftInput)
- builder.push(snapshot)
- builder.join(correlate.getJoinType, condition)
- val temporalJoin = builder.build()
+ val condition = builder.and(actualJoin.getCondition, temporalCondition)
+
+ val joinWithTemporalCondition = actualJoin.copy(
+ actualJoin.getTraitSet,
+ condition,
+ actualJoin.getLeft,
+ actualJoin.getRight,
+ actualJoin.getJoinType,
+ actualJoin.isSemiJoinDone)
+
+ val temporalJoin = if (actualJoin != rewriteJoin) {
+ rewriteJoin.replaceInput(0, joinWithTemporalCondition)
+ rewriteJoin
+ } else {
+ joinWithTemporalCondition
+ }
call.transformTo(temporalJoin)
}
+ private def extractJoinKeys(actualJoin: LogicalJoin): (Seq[RexNode],
Seq[RexNode]) = {
+
+ val joinInfo = actualJoin.analyzeCondition()
+ val leftInput = actualJoin.getInput(0)
+ val rightInput = actualJoin.getInput(1)
+ val rexBuilder = actualJoin.getCluster.getRexBuilder
+
+ val leftJoinKey = joinInfo.leftKeys.map(i =>
rexBuilder.makeInputRef(leftInput, i))
+ val leftFieldCnt = leftInput.getRowType.getFieldCount
+ val rightJoinKey = joinInfo.rightKeys.map(
+ i => {
+ val rightKeyType = rightInput.getRowType.getFieldList.get(i).getType
+ rexBuilder.makeInputRef(rightKeyType, leftFieldCnt + i)
+ })
+ if (leftJoinKey.isEmpty || rightJoinKey.isEmpty) {
+ throw new ValidationException(
+ "Currently the join key in Temporal Table Join " +
+ "can not be empty.")
+ }
+ (leftJoinKey, rightJoinKey)
+ }
+
private def isRowTimeTemporalTableJoin(snapshot: LogicalSnapshot): Boolean =
snapshot.getPeriod.getType match {
case t: TimeIndicatorRelDataType if t.isEventTime => true
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
index 44f48fc24c3..a8584ac951f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
@@ -35,6 +35,8 @@ public class TemporalJoinRestoreTest extends RestoreTestBase {
public List<TableTestProgram> programs() {
return Arrays.asList(
TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN,
+ TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY,
+ TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP,
TemporalJoinTestPrograms.TEMPORAL_JOIN_TEMPORAL_FUNCTION);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
index 883d628fbd9..ed83169c74a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
@@ -23,6 +23,9 @@ import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.apache.flink.table.api.Expressions.$;
/** {@link TableTestProgram} definitions for testing {@link
StreamExecTemporalJoin}. */
@@ -45,6 +48,49 @@ public class TemporalJoinTestPrograms {
Row.of(1L, "USD", "2020-10-10 00:00:58"))
.build();
+ static final SourceTestStep ORDERS_WITH_NESTED_ID =
+ SourceTestStep.newBuilder("OrdersNestedId")
+ .addSchema(
+ "amount bigint",
+ "nested_row ROW<currency STRING>",
+ "nested_map MAP<STRING NOT NULL, STRING>",
+ "order_time STRING",
+ "rowtime as TO_TIMESTAMP(order_time) ",
+ "WATERMARK FOR rowtime AS rowtime")
+ .producedBeforeRestore(
+ Row.of(
+ 2L,
+ Row.of("Euro"),
+ mapOf("currency", "Euro"),
+ "2020-10-10 00:00:42"),
+ Row.of(
+ 1L,
+ Row.of("usd"),
+ mapOf("currency", "USD"),
+ "2020-10-10 00:00:43"),
+ Row.of(
+ 50L,
+ Row.of("Yen"),
+ mapOf("currency", "Yen"),
+ "2020-10-10 00:00:44"),
+ Row.of(
+ 3L,
+ Row.of("Euro"),
+ mapOf("currency", "Euro"),
+ "2020-10-10 00:00:45"))
+ .producedAfterRestore(
+ Row.of(
+ 1L,
+ Row.of("Euro"),
+ mapOf("currency", "Euro"),
+ "2020-10-10 00:00:58"),
+ Row.of(
+ 1L,
+ Row.of("usd"),
+ mapOf("currency", "USD"),
+ "2020-10-10 00:00:58"))
+ .build();
+
static final SourceTestStep RATES =
SourceTestStep.newBuilder("RatesHistory")
.addSchema(
@@ -84,6 +130,36 @@ public class TemporalJoinTestPrograms {
+ "ON o.currency = r.currency ")
.build();
+ static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_NESTED_KEY =
+ TableTestProgram.of(
+ "temporal-join-table-join-nested-key",
+ "validates temporal join with a table when the
join keys comes from a nested row")
+ .setupTableSource(ORDERS_WITH_NESTED_ID)
+ .setupTableSource(RATES)
+ .setupTableSink(AMOUNTS)
+ .runSql(
+ "INSERT INTO MySink "
+ + "SELECT amount * r.rate "
+ + "FROM OrdersNestedId AS o "
+ + "JOIN RatesHistory FOR SYSTEM_TIME AS OF
o.rowtime AS r "
+ + "ON (case when o.nested_row.currency =
'usd' then upper(o.nested_row.currency) ELSE o.nested_row.currency END) =
r.currency ")
+ .build();
+
+ static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN_KEY_FROM_MAP =
+ TableTestProgram.of(
+ "temporal-join-table-join-key-from-map",
+ "validates temporal join with a table when the
join key comes from a map value")
+ .setupTableSource(ORDERS_WITH_NESTED_ID)
+ .setupTableSource(RATES)
+ .setupTableSink(AMOUNTS)
+ .runSql(
+ "INSERT INTO MySink "
+ + "SELECT amount * r.rate "
+ + "FROM OrdersNestedId AS o "
+ + "JOIN RatesHistory FOR SYSTEM_TIME AS OF
o.rowtime AS r "
+ + "ON o.nested_map['currency'] =
r.currency ")
+ .build();
+
static final TableTestProgram TEMPORAL_JOIN_TEMPORAL_FUNCTION =
TableTestProgram.of(
"temporal-join-temporal-function",
@@ -100,4 +176,10 @@ public class TemporalJoinTestPrograms {
+ "LATERAL TABLE (Rates(o.rowtime)) AS r "
+ "WHERE o.currency = r.currency ")
.build();
+
+ private static Map<String, String> mapOf(String key, String value) {
+ final HashMap<String, String> map = new HashMap<>();
+ map.put(key, value);
+ return map;
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json
new file mode 100644
index 00000000000..130b8af8d4c
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/plan/temporal-join-table-join-key-from-map.json
@@ -0,0 +1,569 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 24,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`OrdersNestedId`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "amount",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "nested_row",
+ "dataType" : "ROW<`currency` VARCHAR(2147483647)>"
+ }, {
+ "name" : "nested_map",
+ "dataType" : "MAP<VARCHAR(2147483647) NOT NULL,
VARCHAR(2147483647)>"
+ }, {
+ "name" : "order_time",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`order_time`)"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 3 ], [ 2 ] ],
+ "producedType" : "ROW<`amount` BIGINT, `order_time`
VARCHAR(2147483647), `nested_map` MAP<VARCHAR(2147483647) NOT NULL,
VARCHAR(2147483647)>> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`amount` BIGINT, `order_time`
VARCHAR(2147483647), `nested_map` MAP<VARCHAR(2147483647) NOT NULL,
VARCHAR(2147483647)>> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647),
`nested_map` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, OrdersNestedId, project=[amount, order_time, nested_map],
metadata=[]]], fields=[amount, order_time, nested_map])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 25,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`amount` BIGINT, `rowtime` TIMESTAMP(3), `nested_map`
MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>>",
+ "description" : "Calc(select=[amount, TO_TIMESTAMP(order_time) AS rowtime,
nested_map])"
+ }, {
+ "id" : 26,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 1,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "nested_map",
+ "fieldType" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>"
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 27,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$ITEM$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)>"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "currency",
+ "type" : "CHAR(8) NOT NULL"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$f5",
+ "fieldType" : "VARCHAR(2147483647)"
+ } ]
+ },
+ "description" : "Calc(select=[amount, rowtime, ITEM(nested_map,
'currency') AS $f5])"
+ }, {
+ "id" : 28,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 2 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$f5",
+ "fieldType" : "VARCHAR(2147483647)"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[$f5]])"
+ }, {
+ "id" : 29,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "currency",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "rate_time",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`rate_time`)"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ],
+ "primaryKey" : {
+ "name" : "PK_currency",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "currency" ]
+ }
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate`
BIGINT, `rate_time` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, RatesHistory]], fields=[currency, rate, rate_time])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 30,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate`
BIGINT, `rowtime` TIMESTAMP(3)>",
+ "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS
rowtime])"
+ }, {
+ "id" : 31,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "currency",
+ "fieldType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 32,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "currency",
+ "fieldType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[currency]])"
+ }, {
+ "id" : 33,
+ "type" : "stream-exec-temporal-join_1",
+ "joinSpec" : {
+ "joinType" : "INNER",
+ "leftKeys" : [ 2 ],
+ "rightKeys" : [ 0 ],
+ "filterNulls" : [ true ],
+ "nonEquiCondition" : null
+ },
+ "isTemporalFunctionJoin" : false,
+ "leftTimeAttributeIndex" : 1,
+ "rightTimeAttributeIndex" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$f5",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "currency",
+ "fieldType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime0",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "TemporalJoin(joinType=[InnerJoin], where=[(($f5 =
currency) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency), __TEMPORAL_JOIN_LEFT_KEY($f5),
__TEMPORAL_JOIN_RIGHT_KEY(currency)))], select=[amount, rowtime, $f5, currency,
rate, rowtime0])"
+ }, {
+ "id" : 34,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT>",
+ "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
+ }, {
+ "id" : 35,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "amount",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[EXPR$0])"
+ } ],
+ "edges" : [ {
+ "source" : 24,
+ "target" : 25,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 25,
+ "target" : 26,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 26,
+ "target" : 27,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 27,
+ "target" : 28,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 29,
+ "target" : 30,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 30,
+ "target" : 31,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 31,
+ "target" : 32,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 28,
+ "target" : 33,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 32,
+ "target" : 33,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 33,
+ "target" : 34,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 34,
+ "target" : 35,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata
new file mode 100644
index 00000000000..ac6d04137e8
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-key-from-map/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json
new file mode 100644
index 00000000000..679b3835273
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/plan/temporal-join-table-join-nested-key.json
@@ -0,0 +1,600 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 12,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`OrdersNestedId`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "amount",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "nested_row",
+ "dataType" : "ROW<`currency` VARCHAR(2147483647)>"
+ }, {
+ "name" : "nested_map",
+ "dataType" : "MAP<VARCHAR(2147483647) NOT NULL,
VARCHAR(2147483647)>"
+ }, {
+ "name" : "order_time",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`order_time`)"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 3 ], [ 1 ] ],
+ "producedType" : "ROW<`amount` BIGINT, `order_time`
VARCHAR(2147483647), `nested_row` ROW<`currency` VARCHAR(2147483647)>> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`amount` BIGINT, `order_time`
VARCHAR(2147483647), `nested_row` ROW<`currency` VARCHAR(2147483647)>> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`amount` BIGINT, `order_time` VARCHAR(2147483647),
`nested_row` ROW<`currency` VARCHAR(2147483647)>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, OrdersNestedId, project=[amount, order_time, nested_row],
metadata=[]]], fields=[amount, order_time, nested_row])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 13,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "ROW<`currency` VARCHAR(2147483647)>"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`amount` BIGINT, `rowtime` TIMESTAMP(3), `nested_row`
ROW<`currency` VARCHAR(2147483647)>>",
+ "description" : "Calc(select=[amount, TO_TIMESTAMP(order_time) AS rowtime,
nested_row])"
+ }, {
+ "id" : 14,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 1,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "nested_row",
+ "fieldType" : "ROW<`currency` VARCHAR(2147483647)>"
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 15,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CASE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "currency",
+ "expr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "ROW<`currency` VARCHAR(2147483647)>"
+ }
+ }, {
+ "kind" : "LITERAL",
+ "value" : "usd",
+ "type" : "VARCHAR(2147483647) NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$UPPER$1",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "currency",
+ "expr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "ROW<`currency` VARCHAR(2147483647)>"
+ }
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "FIELD_ACCESS",
+ "name" : "currency",
+ "expr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "ROW<`currency` VARCHAR(2147483647)>"
+ }
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$f5",
+ "fieldType" : "VARCHAR(2147483647)"
+ } ]
+ },
+ "description" : "Calc(select=[amount, rowtime, CASE((nested_row.currency =
'usd'), UPPER(nested_row.currency), nested_row.currency) AS $f5])"
+ }, {
+ "id" : 16,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 2 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$f5",
+ "fieldType" : "VARCHAR(2147483647)"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[$f5]])"
+ }, {
+ "id" : 17,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "currency",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "rate_time",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`rate_time`)"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ],
+ "primaryKey" : {
+ "name" : "PK_currency",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "currency" ]
+ }
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate`
BIGINT, `rate_time` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, RatesHistory]], fields=[currency, rate, rate_time])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 18,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate`
BIGINT, `rowtime` TIMESTAMP(3)>",
+ "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS
rowtime])"
+ }, {
+ "id" : 19,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "currency",
+ "fieldType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 20,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "currency",
+ "fieldType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[currency]])"
+ }, {
+ "id" : 21,
+ "type" : "stream-exec-temporal-join_1",
+ "joinSpec" : {
+ "joinType" : "INNER",
+ "leftKeys" : [ 2 ],
+ "rightKeys" : [ 0 ],
+ "filterNulls" : [ true ],
+ "nonEquiCondition" : null
+ },
+ "isTemporalFunctionJoin" : false,
+ "leftTimeAttributeIndex" : 1,
+ "rightTimeAttributeIndex" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "amount",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$f5",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "currency",
+ "fieldType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "rate",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime0",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "TemporalJoin(joinType=[InnerJoin], where=[(($f5 =
currency) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0,
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency), __TEMPORAL_JOIN_LEFT_KEY($f5),
__TEMPORAL_JOIN_RIGHT_KEY(currency)))], select=[amount, rowtime, $f5, currency,
rate, rowtime0])"
+ }, {
+ "id" : 22,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT>",
+ "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
+ }, {
+ "id" : 23,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "amount",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[EXPR$0])"
+ } ],
+ "edges" : [ {
+ "source" : 12,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 13,
+ "target" : 14,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 14,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 15,
+ "target" : 16,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 17,
+ "target" : 18,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 18,
+ "target" : 19,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 19,
+ "target" : 20,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 16,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 20,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 21,
+ "target" : 22,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 22,
+ "target" : 23,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata
new file mode 100644
index 00000000000..c70770ac5db
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join-nested-key/savepoint/_metadata
differ