This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 898935d75b2 [FLINK-33422] Implement restore tests for Calc node
898935d75b2 is described below
commit 898935d75b2fc073bcee61758b3b0443cba584ca
Author: bvarghese1 <[email protected]>
AuthorDate: Mon Oct 30 17:20:41 2023 -0700
[FLINK-33422] Implement restore tests for Calc node
---
.../flink/table/test/program/TableTestProgram.java | 5 +
.../plan/nodes/exec/testutils/CalcRestoreTest.java | 11 +-
.../nodes/exec/testutils/CalcTestPrograms.java | 166 +++++++++++++
.../plan/nodes/exec/testutils/RestoreTestBase.java | 13 +-
.../plan/calc-filter-pushdown.json | 92 +++++++
.../calc-filter-pushdown/savepoint/_metadata | Bin 0 -> 6981 bytes
.../calc-filter/plan/calc-filter.json | 141 +++++++++++
.../calc-filter/savepoint/_metadata | Bin 0 -> 9372 bytes
.../plan/calc-project-pushdown.json | 135 ++++++++++
.../calc-project-pushdown/savepoint/_metadata | Bin 0 -> 7090 bytes
.../calc-sarg/plan/calc-sarg.json | 132 ++++++++++
.../calc-sarg/savepoint/_metadata | Bin 0 -> 5874 bytes
.../calc-udf-complex/plan/calc-udf-complex.json | 272 +++++++++++++++++++++
.../calc-udf-complex/savepoint/_metadata | Bin 0 -> 13385 bytes
.../calc-udf-simple/plan/calc-udf-simple.json | 111 +++++++++
.../calc-udf-simple/savepoint/_metadata | Bin 0 -> 7016 bytes
16 files changed, 1073 insertions(+), 5 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
index 731c967bba1..5af08b29998 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -103,6 +103,11 @@ public class TableTestProgram {
this.runSteps = runSteps;
}
+ @Override
+ public String toString() {
+ return id;
+ }
+
/**
* Entrypoint for a {@link TableTestProgram} that forces an identifier and
description of the
* test program.
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
index 117316aa8e1..2b408bfcd67 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcRestoreTest.java
@@ -21,7 +21,7 @@ package
org.apache.flink.table.planner.plan.nodes.exec.testutils;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
import org.apache.flink.table.test.program.TableTestProgram;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
/** Restore tests for {@link StreamExecCalc}. */
@@ -33,6 +33,13 @@ public class CalcRestoreTest extends RestoreTestBase {
@Override
public List<TableTestProgram> programs() {
- return Collections.singletonList(CalcTestPrograms.SIMPLE_CALC);
+ return Arrays.asList(
+ CalcTestPrograms.SIMPLE_CALC,
+ CalcTestPrograms.CALC_FILTER,
+ CalcTestPrograms.CALC_FILTER_PUSHDOWN,
+ CalcTestPrograms.CALC_PROJECT_PUSHDOWN,
+ CalcTestPrograms.CALC_SARG,
+ CalcTestPrograms.CALC_UDF_SIMPLE,
+ CalcTestPrograms.CALC_UDF_COMPLEX);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
index feb151ba9ef..51d23f3183c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestPrograms.java
@@ -19,11 +19,18 @@
package org.apache.flink.table.planner.plan.nodes.exec.testutils;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc1;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc2;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.UdfWithOpen;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
+import java.time.LocalDateTime;
+
/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
public class CalcTestPrograms {
@@ -43,4 +50,163 @@ public class CalcTestPrograms {
.build())
.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
.build();
+
+ static final TableTestProgram CALC_PROJECT_PUSHDOWN =
+ TableTestProgram.of(
+ "calc-project-pushdown", "validates calc node with
project pushdown")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a BIGINT", "b DOUBLE")
+ .addOption("filterable-fields", "a")
+ .producedBeforeRestore(Row.of(421L, 42.1))
+ .producedAfterRestore(Row.of(421L, 42.1))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a BIGINT", "a1 VARCHAR")
+ .consumedBeforeRestore(Row.of(421L, "421"))
+ .consumedAfterRestore(Row.of(421L, "421"))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT a, CAST(a AS VARCHAR)
FROM source_t WHERE a > CAST(1 AS BIGINT)")
+ .build();
+
+ static final TableTestProgram CALC_FILTER =
+ TableTestProgram.of("calc-filter", "validates calc node with
filter")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a BIGINT", "b INT", "c
DOUBLE", "d VARCHAR")
+ .producedBeforeRestore(Row.of(420L, 1,
42.0, "hello"))
+ .producedAfterRestore(Row.of(420L, 1,
42.0, "hello"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a BIGINT", "b INT", "c
DOUBLE", "d VARCHAR")
+ .consumedBeforeRestore(Row.of(420L, 1,
42.0, "hello"))
+ .consumedAfterRestore(Row.of(420L, 1,
42.0, "hello"))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT * FROM source_t WHERE b
> 0")
+ .build();
+
+ static final TableTestProgram CALC_FILTER_PUSHDOWN =
+ TableTestProgram.of("calc-filter-pushdown", "validates calc node
with filter pushdown")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a BIGINT", "b DOUBLE")
+ .addOption("filterable-fields", "a")
+ .producedBeforeRestore(Row.of(421L, 42.1))
+ .producedAfterRestore(Row.of(421L, 42.1))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a BIGINT", "b DOUBLE")
+ .consumedBeforeRestore(Row.of(421L, 42.1))
+ .consumedAfterRestore(Row.of(421L, 42.1))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT a, b FROM source_t
WHERE a > CAST(420 AS BIGINT)")
+ .build();
+
+ static final TableTestProgram CALC_SARG =
+ TableTestProgram.of("calc-sarg", "validates calc node with Sarg")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT")
+ .addOption("filterable-fields", "a")
+ .producedBeforeRestore(Row.of(1))
+ .producedAfterRestore(Row.of(1))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT")
+ .consumedBeforeRestore(Row.of(1))
+ .consumedAfterRestore(Row.of(1))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT a FROM source_t WHERE a
= 1 or a = 2 or a is null")
+ .build();
+
+ static final TableTestProgram CALC_UDF_SIMPLE =
+ TableTestProgram.of("calc-udf-simple", "validates calc node with
simple UDF")
+ .setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT")
+ .producedBeforeRestore(Row.of(5))
+ .producedAfterRestore(Row.of(5))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT", "a1 BIGINT")
+ .consumedBeforeRestore(Row.of(5, 6L))
+ .consumedAfterRestore(Row.of(5, 6L))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT a, udf1(a) FROM
source_t")
+ .build();
+
+ static final TableTestProgram CALC_UDF_COMPLEX =
+ TableTestProgram.of("calc-udf-complex", "validates calc node with
complex UDFs")
+ .setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
+ .setupTemporaryCatalogFunction("udf2", JavaFunc1.class)
+ .setupTemporarySystemFunction("udf3", JavaFunc2.class)
+ .setupTemporarySystemFunction("udf4", UdfWithOpen.class)
+ .setupCatalogFunction("udf5", JavaFunc5.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(
+ "a BIGINT, b INT NOT NULL, c
VARCHAR, d TIMESTAMP(3)")
+ .producedBeforeRestore(
+ Row.of(
+ 5L,
+ 11,
+ "hello world",
+ LocalDateTime.of(2023, 12,
16, 1, 1, 1, 123)))
+ .producedAfterRestore(
+ Row.of(
+ 5L,
+ 11,
+ "hello world",
+ LocalDateTime.of(2023, 12,
16, 1, 1, 1, 123)))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "a BIGINT",
+ "a1 VARCHAR",
+ "b INT NOT NULL",
+ "b1 VARCHAR",
+ "c1 VARCHAR",
+ "c2 VARCHAR",
+ "d1 TIMESTAMP(3)")
+ .consumedBeforeRestore(
+ Row.of(
+ 5L,
+ "5",
+ 11,
+ "11 and 11 and
1702688461000",
+ "hello world11",
+ "$hello",
+ LocalDateTime.of(2023, 12,
16, 01, 01, 00, 0)))
+ .consumedAfterRestore(
+ Row.of(
+ 5L,
+ "5",
+ 11,
+ "11 and 11 and
1702688461000",
+ "hello world11",
+ "$hello",
+ LocalDateTime.of(2023, 12,
16, 01, 01, 00, 0)))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT "
+ + "a, "
+ + "cast(a as VARCHAR) as a1, "
+ + "b, "
+ + "udf2(b, b, d) as b1, "
+ + "udf3(c, b) as c1, "
+ + "udf4(substring(c, 1, 5)) as c2, "
+ + "udf5(d, 1000) as d1 "
+ + "from source_t where "
+ + "(udf1(a) > 0 or (a * b) < 100) and b >
10")
+ .build();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 23e0926140c..f5c370668e2 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -88,7 +88,10 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
@Override
public EnumSet<TestKind> supportedSetupSteps() {
- return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA,
TestKind.SINK_WITH_RESTORE_DATA);
+ return EnumSet.of(
+ TestKind.FUNCTION,
+ TestKind.SOURCE_WITH_RESTORE_DATA,
+ TestKind.SINK_WITH_RESTORE_DATA);
}
@Override
@@ -110,7 +113,7 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
return getAllMetadata().stream()
.flatMap(
metadata ->
- supportedPrograms().stream().map(p ->
Arguments.of(metadata, p)));
+ supportedPrograms().stream().map(p ->
Arguments.of(p, metadata)));
}
/**
@@ -161,6 +164,8 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
sinkTestStep.apply(tEnv, options);
}
+ program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+
final SqlTestStep sqlTestStep = program.getRunSqlTestStep();
final CompiledPlan compiledPlan = tEnv.compilePlanSql(sqlTestStep.sql);
@@ -182,7 +187,7 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
@ParameterizedTest
@MethodSource("createSpecs")
- void testRestore(ExecNodeMetadata metadata, TableTestProgram program)
throws Exception {
+ void testRestore(TableTestProgram program, ExecNodeMetadata metadata)
throws Exception {
final EnvironmentSettings settings =
EnvironmentSettings.inStreamingMode();
final SavepointRestoreSettings restoreSettings =
SavepointRestoreSettings.forPath(
@@ -213,6 +218,8 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
sinkTestStep.apply(tEnv, options);
}
+ program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+
final CompiledPlan compiledPlan =
tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program,
metadata)));
compiledPlan.execute().await();
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
new file mode 100644
index 00000000000..9327810d35e
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
@@ -0,0 +1,92 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 420,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[>(a, 420:BIGINT)]]], fields=[a, b])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, b])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/savepoint/_metadata
new file mode 100644
index 00000000000..c2093c6d15d
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter-pushdown/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/plan/calc-filter.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/plan/calc-filter.json
new file mode 100644
index 00000000000..b1d3b7bc1f9
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/plan/calc-filter.json
@@ -0,0 +1,141 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "DOUBLE"
+ }, {
+ "name" : "d",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "DOUBLE"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[a, b, c, d], where=[(b > 0)])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "DOUBLE"
+ }, {
+ "name" : "d",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d`
VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, b, c, d])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/savepoint/_metadata
new file mode 100644
index 00000000000..fcefcf935ee
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-filter/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
new file mode 100644
index 00000000000..dde65ade185
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
@@ -0,0 +1,135 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ]
+ }, {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ] ],
+ "producedType" : "ROW<`a` BIGINT> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` BIGINT> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[>(a, 1:BIGINT)], project=[a],
metadata=[]]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS
EXPR$1])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "a1",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/savepoint/_metadata
new file mode 100644
index 00000000000..e259d1e9f35
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-project-pushdown/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/plan/calc-sarg.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/plan/calc-sarg.json
new file mode 100644
index 00000000000..b8d71456481
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/plan/calc-sarg.json
@@ -0,0 +1,132 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "INTERNAL",
+ "internalName" : "$SEARCH$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "sarg" : {
+ "ranges" : [ {
+ "lower" : {
+ "value" : 1,
+ "boundType" : "CLOSED"
+ },
+ "upper" : {
+ "value" : 1,
+ "boundType" : "CLOSED"
+ }
+ }, {
+ "lower" : {
+ "value" : 2,
+ "boundType" : "CLOSED"
+ },
+ "upper" : {
+ "value" : 2,
+ "boundType" : "CLOSED"
+ }
+ } ],
+ "nullAs" : "TRUE"
+ },
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT>",
+ "description" : "Calc(select=[a], where=[SEARCH(a, Sarg[1, 2; NULL AS
TRUE])])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/savepoint/_metadata
new file mode 100644
index 00000000000..98082d5d1c9
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-sarg/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
new file mode 100644
index 00000000000..ac6119b4966
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
@@ -0,0 +1,272 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`d` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf2`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "systemName" : "udf3",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "systemName" : "udf4",
+ "operands" : [ {
+ "kind" : "CALL",
+ "internalName" : "$SUBSTRING$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 5,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf5`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1000,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$AND$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$OR$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$<$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 100,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 10,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2`
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+ "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS a1, b,
udf2(b, b, d) AS b1, udf3(c, b) AS c1, udf4(SUBSTRING(c, 1, 5)) AS c2, udf5(d,
1000) AS d1], where=[(((udf1(a) > 0) OR ((a * b) < 100)) AND (b > 10))])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "a1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c2",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d1",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2`
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, a1, b, b1, c1, c2, d1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/savepoint/_metadata
new file mode 100644
index 00000000000..79c262d72a5
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-complex/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
new file mode 100644
index 00000000000..894d252d5b1
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
@@ -0,0 +1,111 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+ "description" : "Calc(select=[a, udf1(CAST(a AS BIGINT)) AS EXPR$1])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "a1",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/savepoint/_metadata
new file mode 100644
index 00000000000..c110557cab5
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/calc-udf-simple/savepoint/_metadata
differ