This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 46d817d8d297b50fe91b5fb9471bda791a6f4319
Author: Jim Hughes <jhug...@confluent.io>
AuthorDate: Mon Dec 11 13:26:42 2023 -0500

    [FLINK-33767] Implement restore tests for TemporalJoin node
    
    This closes #23916
---
 .../flink/table/test/program/TableTestProgram.java |  22 +
 .../test/program/TemporalFunctionTestStep.java     |  67 +++
 .../apache/flink/table/test/program/TestStep.java  |   1 +
 .../nodes/exec/stream/TemporalJoinRestoreTest.java |  40 ++
 .../exec/stream/TemporalJoinTestPrograms.java      | 103 +++++
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   3 +
 .../testTemporalTableJoin.json                     | 421 ++++++++++++++++++
 .../plan/temporal-join-table-join.json             | 494 +++++++++++++++++++++
 .../temporal-join-table-join/savepoint/_metadata   | Bin 0 -> 14926 bytes
 .../plan/temporal-join-temporal-function.json      | 494 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 14926 bytes
 11 files changed, 1645 insertions(+)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
index 7777f5323b0..7d4c4b45eb5 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.test.program;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior;
 import 
org.apache.flink.table.test.program.FunctionTestStep.FunctionPersistence;
@@ -176,6 +177,14 @@ public class TableTestProgram {
                 .collect(Collectors.toList());
     }
 
+    /** Convenience method to avoid casting. It assumes that the order of 
steps is not important. */
+    public List<TemporalFunctionTestStep> getSetupTemporalFunctionTestSteps() {
+        return setupSteps.stream()
+                .filter(s -> s.getKind() == TestKind.TEMPORAL_FUNCTION)
+                .map(TemporalFunctionTestStep.class::cast)
+                .collect(Collectors.toList());
+    }
+
     /**
      * Convenience method to avoid boilerplate code. It assumes that only a 
single SQL statement is
      * tested.
@@ -231,6 +240,19 @@ public class TableTestProgram {
             return this;
         }
 
+        /** Setup step for registering a temporary system function. */
+        public Builder setupTemporarySystemTemporalTableFunction(
+                String name, String table, Expression timeAttribute, 
Expression primaryKey) {
+            this.setupSteps.add(
+                    new TemporalFunctionTestStep(
+                            TemporalFunctionTestStep.FunctionBehavior.SYSTEM,
+                            name,
+                            table,
+                            timeAttribute,
+                            primaryKey));
+            return this;
+        }
+
         /** Setup step for registering a temporary catalog function. */
         public Builder setupTemporaryCatalogFunction(
                 String name, Class<? extends UserDefinedFunction> function) {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java
new file mode 100644
index 00000000000..206f7fa38c1
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TemporalFunctionTestStep.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.TemporalTableFunction;
+
+/** Test step for registering a (temporary) (system or catalog) function. */
+public final class TemporalFunctionTestStep implements TestStep {
+
+    /** Whether function should be persisted in a catalog or not. */
+    enum FunctionBehavior {
+        SYSTEM,
+        CATALOG
+    }
+
+    public final FunctionBehavior behavior;
+    public final String name;
+    public final String table;
+    public final Expression timeAttribute;
+    public final Expression primaryKey;
+
+    TemporalFunctionTestStep(
+            FunctionBehavior behavior,
+            String name,
+            String table,
+            Expression timeAttribute,
+            Expression primaryKey) {
+        this.behavior = behavior;
+        this.name = name;
+        this.table = table;
+        this.timeAttribute = timeAttribute;
+        this.primaryKey = primaryKey;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return TestKind.TEMPORAL_FUNCTION;
+    }
+
+    public void apply(TableEnvironment env) {
+        TemporalTableFunction function =
+                env.from(table).createTemporalTableFunction(timeAttribute, 
primaryKey);
+        if (behavior == FunctionBehavior.SYSTEM) {
+            env.createTemporarySystemFunction(name, function);
+        } else {
+            env.createTemporaryFunction(name, function);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java
index 6839a66067c..293789a2353 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java
@@ -43,6 +43,7 @@ public interface TestStep {
         STATEMENT_SET,
         CONFIG,
         FUNCTION,
+        TEMPORAL_FUNCTION,
         SOURCE_WITHOUT_DATA,
         SOURCE_WITH_DATA,
         SOURCE_WITH_RESTORE_DATA,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
new file mode 100644
index 00000000000..44f48fc24c3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecTemporalJoin}. */
+public class TemporalJoinRestoreTest extends RestoreTestBase {
+
+    public TemporalJoinRestoreTest() {
+        super(StreamExecTemporalJoin.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                TemporalJoinTestPrograms.TEMPORAL_JOIN_TABLE_JOIN,
+                TemporalJoinTestPrograms.TEMPORAL_JOIN_TEMPORAL_FUNCTION);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
new file mode 100644
index 00000000000..883d628fbd9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalJoin}. */
+public class TemporalJoinTestPrograms {
+    static final SourceTestStep ORDERS =
+            SourceTestStep.newBuilder("Orders")
+                    .addSchema(
+                            "amount bigint",
+                            "currency STRING",
+                            "order_time STRING",
+                            "rowtime as TO_TIMESTAMP(order_time) ",
+                            "WATERMARK FOR rowtime AS rowtime")
+                    .producedBeforeRestore(
+                            Row.of(2L, "Euro", "2020-10-10 00:00:42"),
+                            Row.of(1L, "USD", "2020-10-10 00:00:43"),
+                            Row.of(50L, "Yen", "2020-10-10 00:00:44"),
+                            Row.of(3L, "Euro", "2020-10-10 00:00:45"))
+                    .producedAfterRestore(
+                            Row.of(1L, "Euro", "2020-10-10 00:00:58"),
+                            Row.of(1L, "USD", "2020-10-10 00:00:58"))
+                    .build();
+
+    static final SourceTestStep RATES =
+            SourceTestStep.newBuilder("RatesHistory")
+                    .addSchema(
+                            "currency STRING",
+                            "rate bigint",
+                            "rate_time STRING",
+                            "rowtime as TO_TIMESTAMP(rate_time) ",
+                            "WATERMARK FOR rowtime AS rowtime",
+                            "PRIMARY KEY(currency) NOT ENFORCED")
+                    .producedBeforeRestore(
+                            Row.of("USD", 102L, "2020-10-10 00:00:41"),
+                            Row.of("Euro", 114L, "2020-10-10 00:00:41"),
+                            Row.of("Yen", 1L, "2020-10-10 00:00:41"),
+                            Row.of("Euro", 116L, "2020-10-10 00:00:45"),
+                            Row.of("Euro", 119L, "2020-10-10 00:00:47"))
+                    .producedAfterRestore(
+                            Row.of("USD", 103L, "2020-10-10 00:00:58"),
+                            Row.of("Euro", 120L, "2020-10-10 00:00:59"))
+                    .build();
+
+    static final SinkTestStep AMOUNTS =
+            SinkTestStep.newBuilder("MySink")
+                    .addSchema("amount bigint")
+                    .consumedBeforeRestore("+I[102]", "+I[228]", "+I[348]", 
"+I[50]")
+                    .consumedAfterRestore("+I[103]", "+I[119]")
+                    .build();
+    static final TableTestProgram TEMPORAL_JOIN_TABLE_JOIN =
+            TableTestProgram.of("temporal-join-table-join", "validates 
temporal join with a table")
+                    .setupTableSource(ORDERS)
+                    .setupTableSource(RATES)
+                    .setupTableSink(AMOUNTS)
+                    .runSql(
+                            "INSERT INTO MySink "
+                                    + "SELECT amount * r.rate "
+                                    + "FROM Orders AS o "
+                                    + "JOIN RatesHistory FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
+                                    + "ON o.currency = r.currency ")
+                    .build();
+
+    static final TableTestProgram TEMPORAL_JOIN_TEMPORAL_FUNCTION =
+            TableTestProgram.of(
+                            "temporal-join-temporal-function",
+                            "validates temporal join with a temporal function")
+                    .setupTableSource(ORDERS)
+                    .setupTableSource(RATES)
+                    .setupTemporarySystemTemporalTableFunction(
+                            "Rates", "RatesHistory", $("rowtime"), 
$("currency"))
+                    .setupTableSink(AMOUNTS)
+                    .runSql(
+                            "INSERT INTO MySink "
+                                    + "SELECT amount * r.rate "
+                                    + "FROM Orders AS o,  "
+                                    + "LATERAL TABLE (Rates(o.rowtime)) AS r "
+                                    + "WHERE o.currency = r.currency ")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 8dd4646df10..619c3b3ba67 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -114,6 +114,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         return EnumSet.of(
                 TestKind.CONFIG,
                 TestKind.FUNCTION,
+                TestKind.TEMPORAL_FUNCTION,
                 TestKind.SOURCE_WITH_RESTORE_DATA,
                 TestKind.SINK_WITH_RESTORE_DATA);
     }
@@ -207,6 +208,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         }
 
         program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+        program.getSetupTemporalFunctionTestSteps().forEach(s -> 
s.apply(tEnv));
 
         final SqlTestStep sqlTestStep = program.getRunSqlTestStep();
 
@@ -271,6 +273,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         }
 
         program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+        program.getSetupTemporalFunctionTestSteps().forEach(s -> 
s.apply(tEnv));
 
         final CompiledPlan compiledPlan =
                 tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, 
metadata)));
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json
new file mode 100644
index 00000000000..bbfd1524d3b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json
@@ -0,0 +1,421 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`Orders`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "INT"
+            }, {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 2,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), 
`rowtime` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[amount, currency, rowtime])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "INT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "INT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "rate",
+              "dataType" : "INT"
+            }, {
+              "name" : "rowtime",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 2,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ],
+            "primaryKey" : {
+              "name" : "PK_currency",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "currency" ]
+            }
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, 
`rowtime` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rowtime])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "INT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "INT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-temporal-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 1 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "isTemporalFunctionJoin" : false,
+    "leftTimeAttributeIndex" : 2,
+    "rightTimeAttributeIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "INT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "currency0",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "INT"
+      }, {
+        "name" : "rowtime0",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = 
currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), 
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], 
select=[amount, currency, rowtime, currency0, rate, rowtime0])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$*$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/plan/temporal-join-table-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/plan/temporal-join-table-join.json
new file mode 100644
index 00000000000..741b5a53df3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/plan/temporal-join-table-join.json
@@ -0,0 +1,494 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`Orders`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`order_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), 
`order_time` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[amount, currency, order_time])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), 
`rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[amount, currency, TO_TIMESTAMP(order_time) 
AS rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "rate",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "rate_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`rate_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ],
+            "primaryKey" : {
+              "name" : "PK_currency",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "currency" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rate_time` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rate_time])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS 
rowtime])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-temporal-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 1 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "isTemporalFunctionJoin" : false,
+    "leftTimeAttributeIndex" : 2,
+    "rightTimeAttributeIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "currency0",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime0",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = 
currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), 
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], 
select=[amount, currency, rowtime, currency0, rate, rowtime0])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$*$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/savepoint/_metadata
new file mode 100644
index 00000000000..4b11a62993e
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-table-join/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/plan/temporal-join-temporal-function.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/plan/temporal-join-temporal-function.json
new file mode 100644
index 00000000000..c6db00d4ef2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/plan/temporal-join-temporal-function.json
@@ -0,0 +1,494 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 12,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`Orders`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "order_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`order_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), 
`order_time` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[amount, currency, order_time])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`amount` BIGINT, `currency` VARCHAR(2147483647), 
`rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[amount, currency, TO_TIMESTAMP(order_time) 
AS rowtime])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "currency",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "rate",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "rate_time",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`rate_time`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ],
+            "primaryKey" : {
+              "name" : "PK_currency",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "currency" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rate_time` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rate_time])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` 
BIGINT, `rowtime` TIMESTAMP(3)>",
+    "description" : "Calc(select=[currency, rate, TO_TIMESTAMP(rate_time) AS 
rowtime])"
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[currency]])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-temporal-join_1",
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 1 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "isTemporalFunctionJoin" : true,
+    "leftTimeAttributeIndex" : 2,
+    "rightTimeAttributeIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "amount",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "currency",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "currency0",
+        "fieldType" : "VARCHAR(2147483647) NOT NULL"
+      }, {
+        "name" : "rate",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime0",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TemporalJoin(joinType=[InnerJoin], 
where=[(__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0)) AND (currency = currency0))], 
select=[amount, currency, rowtime, currency0, rate, rowtime0])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$*$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
+  }, {
+    "id" : 22,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "amount",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
+  } ],
+  "edges" : [ {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/savepoint/_metadata
new file mode 100644
index 00000000000..e3307ab0c9e
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-temporal-join_1/temporal-join-temporal-function/savepoint/_metadata
 differ

Reply via email to