This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 898935d75b2 [FLINK-33422] Implement restore tests for Calc node
898935d75b2 is described below

commit 898935d75b2fc073bcee61758b3b0443cba584ca
Author: bvarghese1 <[email protected]>
AuthorDate: Mon Oct 30 17:20:41 2023 -0700

    [FLINK-33422] Implement restore tests for Calc node
---
 .../flink/table/test/program/TableTestProgram.java |   5 +
 .../plan/nodes/exec/testutils/CalcRestoreTest.java |  11 +-
 .../nodes/exec/testutils/CalcTestPrograms.java     | 166 +++++++++++++
 .../plan/nodes/exec/testutils/RestoreTestBase.java |  13 +-
 .../plan/calc-filter-pushdown.json                 |  92 +++++++
 .../calc-filter-pushdown/savepoint/_metadata       | Bin 0 -> 6981 bytes
 .../calc-filter/plan/calc-filter.json              | 141 +++++++++++
 .../calc-filter/savepoint/_metadata                | Bin 0 -> 9372 bytes
 .../plan/calc-project-pushdown.json                | 135 ++++++++++
 .../calc-project-pushdown/savepoint/_metadata      | Bin 0 -> 7090 bytes
 .../calc-sarg/plan/calc-sarg.json                  | 132 ++++++++++
 .../calc-sarg/savepoint/_metadata                  | Bin 0 -> 5874 bytes
 .../calc-udf-complex/plan/calc-udf-complex.json    | 272 +++++++++++++++++++++
 .../calc-udf-complex/savepoint/_metadata           | Bin 0 -> 13385 bytes
 .../calc-udf-simple/plan/calc-udf-simple.json      | 111 +++++++++
 .../calc-udf-simple/savepoint/_metadata            | Bin 0 -> 7016 bytes
 16 files changed, 1073 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
index 731c967bba1..5af08b29998 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -103,6 +103,11 @@ public class TableTestProgram {
         this.runSteps = runSteps;
     }
 
+    @Override
+    public String toString() {
+        return id;
+    }
+
     /**
      * Entrypoint for a {@link TableTestProgram} that forces an identifier and 
description of the
      * test program.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
index 117316aa8e1..2b408bfcd67 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
@@ -21,7 +21,7 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.testutils;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
 import org.apache.flink.table.test.program.TableTestProgram;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 
 /** Restore tests for {@link StreamExecCalc}. */
@@ -33,6 +33,13 @@ public class CalcRestoreTest extends RestoreTestBase {
 
     @Override
     public List<TableTestProgram> programs() {
-        return Collections.singletonList(CalcTestPrograms.SIMPLE_CALC);
+        return Arrays.asList(
+                CalcTestPrograms.SIMPLE_CALC,
+                CalcTestPrograms.CALC_FILTER,
+                CalcTestPrograms.CALC_FILTER_PUSHDOWN,
+                CalcTestPrograms.CALC_PROJECT_PUSHDOWN,
+                CalcTestPrograms.CALC_SARG,
+                CalcTestPrograms.CALC_UDF_SIMPLE,
+                CalcTestPrograms.CALC_UDF_COMPLEX);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
index feb151ba9ef..51d23f3183c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
@@ -19,11 +19,18 @@
 package org.apache.flink.table.planner.plan.nodes.exec.testutils;
 
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc1;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc2;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.UdfWithOpen;
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 
+import java.time.LocalDateTime;
+
 /** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
 public class CalcTestPrograms {
 
@@ -43,4 +50,163 @@ public class CalcTestPrograms {
                                     .build())
                     .runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
                     .build();
+
+    static final TableTestProgram CALC_PROJECT_PUSHDOWN =
+            TableTestProgram.of(
+                            "calc-project-pushdown", "validates calc node with 
project pushdown")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a BIGINT", "b DOUBLE")
+                                    .addOption("filterable-fields", "a")
+                                    .producedBeforeRestore(Row.of(421L, 42.1))
+                                    .producedAfterRestore(Row.of(421L, 42.1))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a BIGINT", "a1 VARCHAR")
+                                    .consumedBeforeRestore(Row.of(421L, "421"))
+                                    .consumedAfterRestore(Row.of(421L, "421"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT a, CAST(a AS VARCHAR) 
FROM source_t WHERE a > CAST(1 AS BIGINT)")
+                    .build();
+
+    static final TableTestProgram CALC_FILTER =
+            TableTestProgram.of("calc-filter", "validates calc node with 
filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a BIGINT", "b INT", "c 
DOUBLE", "d VARCHAR")
+                                    .producedBeforeRestore(Row.of(420L, 1, 
42.0, "hello"))
+                                    .producedAfterRestore(Row.of(420L, 1, 
42.0, "hello"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a BIGINT", "b INT", "c 
DOUBLE", "d VARCHAR")
+                                    .consumedBeforeRestore(Row.of(420L, 1, 
42.0, "hello"))
+                                    .consumedAfterRestore(Row.of(420L, 1, 
42.0, "hello"))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * FROM source_t WHERE b 
> 0")
+                    .build();
+
+    static final TableTestProgram CALC_FILTER_PUSHDOWN =
+            TableTestProgram.of("calc-filter-pushdown", "validates calc node 
with filter pushdown")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a BIGINT", "b DOUBLE")
+                                    .addOption("filterable-fields", "a")
+                                    .producedBeforeRestore(Row.of(421L, 42.1))
+                                    .producedAfterRestore(Row.of(421L, 42.1))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a BIGINT", "b DOUBLE")
+                                    .consumedBeforeRestore(Row.of(421L, 42.1))
+                                    .consumedAfterRestore(Row.of(421L, 42.1))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT a, b FROM source_t 
WHERE a > CAST(420 AS BIGINT)")
+                    .build();
+
+    static final TableTestProgram CALC_SARG =
+            TableTestProgram.of("calc-sarg", "validates calc node with Sarg")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT")
+                                    .addOption("filterable-fields", "a")
+                                    .producedBeforeRestore(Row.of(1))
+                                    .producedAfterRestore(Row.of(1))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT")
+                                    .consumedBeforeRestore(Row.of(1))
+                                    .consumedAfterRestore(Row.of(1))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT a FROM source_t WHERE a 
= 1 or a = 2 or a is null")
+                    .build();
+
+    static final TableTestProgram CALC_UDF_SIMPLE =
+            TableTestProgram.of("calc-udf-simple", "validates calc node with 
simple UDF")
+                    .setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT")
+                                    .producedBeforeRestore(Row.of(5))
+                                    .producedAfterRestore(Row.of(5))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "a1 BIGINT")
+                                    .consumedBeforeRestore(Row.of(5, 6L))
+                                    .consumedAfterRestore(Row.of(5, 6L))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, udf1(a) FROM 
source_t")
+                    .build();
+
+    static final TableTestProgram CALC_UDF_COMPLEX =
+            TableTestProgram.of("calc-udf-complex", "validates calc node with 
complex UDFs")
+                    .setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
+                    .setupTemporaryCatalogFunction("udf2", JavaFunc1.class)
+                    .setupTemporarySystemFunction("udf3", JavaFunc2.class)
+                    .setupTemporarySystemFunction("udf4", UdfWithOpen.class)
+                    .setupCatalogFunction("udf5", JavaFunc5.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "a BIGINT, b INT NOT NULL, c 
VARCHAR, d TIMESTAMP(3)")
+                                    .producedBeforeRestore(
+                                            Row.of(
+                                                    5L,
+                                                    11,
+                                                    "hello world",
+                                                    LocalDateTime.of(2023, 12, 
16, 1, 1, 1, 123)))
+                                    .producedAfterRestore(
+                                            Row.of(
+                                                    5L,
+                                                    11,
+                                                    "hello world",
+                                                    LocalDateTime.of(2023, 12, 
16, 1, 1, 1, 123)))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "a BIGINT",
+                                            "a1 VARCHAR",
+                                            "b INT NOT NULL",
+                                            "b1 VARCHAR",
+                                            "c1 VARCHAR",
+                                            "c2 VARCHAR",
+                                            "d1 TIMESTAMP(3)")
+                                    .consumedBeforeRestore(
+                                            Row.of(
+                                                    5L,
+                                                    "5",
+                                                    11,
+                                                    "11 and 11 and 
1702688461000",
+                                                    "hello world11",
+                                                    "$hello",
+                                                    LocalDateTime.of(2023, 12, 
16, 01, 01, 00, 0)))
+                                    .consumedAfterRestore(
+                                            Row.of(
+                                                    5L,
+                                                    "5",
+                                                    11,
+                                                    "11 and 11 and 
1702688461000",
+                                                    "hello world11",
+                                                    "$hello",
+                                                    LocalDateTime.of(2023, 12, 
16, 01, 01, 00, 0)))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "a, "
+                                    + "cast(a as VARCHAR) as a1, "
+                                    + "b, "
+                                    + "udf2(b, b, d) as b1, "
+                                    + "udf3(c, b) as c1, "
+                                    + "udf4(substring(c, 1, 5)) as c2, "
+                                    + "udf5(d, 1000) as d1 "
+                                    + "from source_t where "
+                                    + "(udf1(a) > 0 or (a * b) < 100) and b > 
10")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 23e0926140c..f5c370668e2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -88,7 +88,10 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
 
     @Override
     public EnumSet<TestKind> supportedSetupSteps() {
-        return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA, 
TestKind.SINK_WITH_RESTORE_DATA);
+        return EnumSet.of(
+                TestKind.FUNCTION,
+                TestKind.SOURCE_WITH_RESTORE_DATA,
+                TestKind.SINK_WITH_RESTORE_DATA);
     }
 
     @Override
@@ -110,7 +113,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         return getAllMetadata().stream()
                 .flatMap(
                         metadata ->
-                                supportedPrograms().stream().map(p -> 
Arguments.of(metadata, p)));
+                                supportedPrograms().stream().map(p -> 
Arguments.of(p, metadata)));
     }
 
     /**
@@ -161,6 +164,8 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             sinkTestStep.apply(tEnv, options);
         }
 
+        program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+
         final SqlTestStep sqlTestStep = program.getRunSqlTestStep();
 
         final CompiledPlan compiledPlan = tEnv.compilePlanSql(sqlTestStep.sql);
@@ -182,7 +187,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
 
     @ParameterizedTest
     @MethodSource("createSpecs")
-    void testRestore(ExecNodeMetadata metadata, TableTestProgram program) 
throws Exception {
+    void testRestore(TableTestProgram program, ExecNodeMetadata metadata) 
throws Exception {
         final EnvironmentSettings settings = 
EnvironmentSettings.inStreamingMode();
         final SavepointRestoreSettings restoreSettings =
                 SavepointRestoreSettings.forPath(
@@ -213,6 +218,8 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             sinkTestStep.apply(tEnv, options);
         }
 
+        program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+
         final CompiledPlan compiledPlan =
                 tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, 
metadata)));
         compiledPlan.execute().await();
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
new file mode 100644
index 00000000000..9327810d35e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
@@ -0,0 +1,92 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 0,
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 420,
+            "type" : "BIGINT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[>(a, 420:BIGINT)]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/savepoint/_metadata
new file mode 100644
index 00000000000..c2093c6d15d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/plan/calc-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/plan/calc-filter.json
new file mode 100644
index 00000000000..b1d3b7bc1f9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/plan/calc-filter.json
@@ -0,0 +1,141 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "d",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 0,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, b, c, d], where=[(b > 0)])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "d",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c, d])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/savepoint/_metadata
new file mode 100644
index 00000000000..fcefcf935ee
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
new file mode 100644
index 00000000000..dde65ade185
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
@@ -0,0 +1,135 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 0,
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 1,
+            "type" : "BIGINT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ]
+      }, {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ] ],
+        "producedType" : "ROW<`a` BIGINT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[>(a, 1:BIGINT)], project=[a], 
metadata=[]]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS 
EXPR$1])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "a1",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/savepoint/_metadata
new file mode 100644
index 00000000000..e259d1e9f35
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/plan/calc-sarg.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/plan/calc-sarg.json
new file mode 100644
index 00000000000..b8d71456481
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/plan/calc-sarg.json
@@ -0,0 +1,132 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "INTERNAL",
+      "internalName" : "$SEARCH$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "sarg" : {
+          "ranges" : [ {
+            "lower" : {
+              "value" : 1,
+              "boundType" : "CLOSED"
+            },
+            "upper" : {
+              "value" : 1,
+              "boundType" : "CLOSED"
+            }
+          }, {
+            "lower" : {
+              "value" : 2,
+              "boundType" : "CLOSED"
+            },
+            "upper" : {
+              "value" : 2,
+              "boundType" : "CLOSED"
+            }
+          } ],
+          "nullAs" : "TRUE"
+        },
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT>",
+    "description" : "Calc(select=[a], where=[SEARCH(a, Sarg[1, 2; NULL AS 
TRUE])])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/savepoint/_metadata
new file mode 100644
index 00000000000..98082d5d1c9
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
new file mode 100644
index 00000000000..ac6119b4966
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
@@ -0,0 +1,272 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf2`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "systemName" : "udf3",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "systemName" : "udf4",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$SUBSTRING$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "VARCHAR(2147483647)"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 1,
+          "type" : "INT NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 5,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf5`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1000,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$AND$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$OR$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "CALL",
+            "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 0,
+              "type" : "BIGINT"
+            } ],
+            "type" : "BIGINT NOT NULL"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$<$1",
+          "operands" : [ {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$*$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 0,
+              "type" : "BIGINT"
+            }, {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 1,
+              "type" : "INT NOT NULL"
+            } ],
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 100,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$>$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "INT NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 10,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT 
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2` 
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+    "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS a1, b, 
udf2(b, b, d) AS b1, udf3(c, b) AS c1, udf4(SUBSTRING(c, 1, 5)) AS c2, udf5(d, 
1000) AS d1], where=[(((udf1(a) > 0) OR ((a * b) < 100)) AND (b > 10))])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "a1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c2",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d1",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT 
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2` 
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, a1, b, b1, c1, c2, d1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/savepoint/_metadata
new file mode 100644
index 00000000000..79c262d72a5
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
new file mode 100644
index 00000000000..894d252d5b1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
@@ -0,0 +1,111 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CAST$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        } ],
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Calc(select=[a, udf1(CAST(a AS BIGINT)) AS EXPR$1])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "a1",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/savepoint/_metadata
new file mode 100644
index 00000000000..c110557cab5
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/savepoint/_metadata
 differ

Reply via email to