This is an automated email from the ASF dual-hosted git repository.
gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4490b3074e6 [FLINK-39258][table] Extend restore tests for TO_CHANGELOG
and FROM_CHANGELOG
4490b3074e6 is described below
commit 4490b3074e67a6197177cacddde4149bd6597aee
Author: Gustavo de Morais <[email protected]>
AuthorDate: Wed Jun 3 17:03:29 2026 +0200
[FLINK-39258][table] Extend restore tests for TO_CHANGELOG and
FROM_CHANGELOG
This closes #28278.
---
.../ToChangelogOutputTypeStrategyTest.java | 60 +++++-
.../exec/stream/FromChangelogRestoreTest.java | 5 +-
.../exec/stream/FromChangelogTestPrograms.java | 65 +++++++
.../nodes/exec/stream/ToChangelogRestoreTest.java | 7 +-
.../nodes/exec/stream/ToChangelogTestPrograms.java | 132 +++++++++++++
...rom-changelog-retract-partition-by-restore.json | 152 +++++++++++++++
.../savepoint/_metadata | Bin 0 -> 12564 bytes
...from-changelog-upsert-partition-by-restore.json | 183 ++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 9805 bytes
.../to-changelog-retract-partition-by-restore.json | 157 ++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 13814 bytes
...g-retract-produces-partial-deletes-restore.json | 135 ++++++++++++++
.../savepoint/_metadata | Bin 0 -> 8271 bytes
.../to-changelog-upsert-partition-by-restore.json | 207 +++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 18497 bytes
.../plan/to-changelog-upsert-restore.json | 185 ++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 12994 bytes
17 files changed, 1285 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
index e912370ef00..8c06c26d7bb 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
@@ -33,7 +33,12 @@ import static
org.apache.flink.table.types.inference.strategies.ToChangelogTypeS
import static
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES;
import static
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE;
-/** Tests for {@link ToChangelogTypeStrategy#OUTPUT_TYPE_STRATEGY}. */
+/**
+ * Tests for {@link ToChangelogTypeStrategy#OUTPUT_TYPE_STRATEGY}.
+ *
+ * <p>For background on row vs. set semantics and how PARTITION BY columns are
handled, see the
+ * Process Table Functions page in the Flink documentation.
+ */
class ToChangelogOutputTypeStrategyTest extends TypeStrategiesTestBase {
private static final DataType TABLE_TYPE_NOT_NULL_SCORE =
@@ -47,6 +52,10 @@ class ToChangelogOutputTypeStrategyTest extends
TypeStrategiesTestBase {
@Override
protected Stream<TestSpec> testData() {
+ return Stream.concat(rowSemantics(), setSemantics());
+ }
+
+ private static Stream<TestSpec> rowSemantics() {
return Stream.of(
TestSpec.forStrategy(
"produces_full_deletes=true preserves NOT NULL
on input columns",
@@ -107,4 +116,53 @@ class ToChangelogOutputTypeStrategyTest extends
TypeStrategiesTestBase {
DataTypes.FIELD("score",
DataTypes.BIGINT()))
.notNull()));
}
+
+ private static Stream<TestSpec> setSemantics() {
+ return Stream.of(
+ TestSpec.forStrategy(
+ "produces_full_deletes=true in set semantics
preserves NOT NULL on non-partition columns",
+ TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+ .inputTypes(
+ TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE,
MAP_TYPE, BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(
+ ARG_TABLE,
+ new TableSemanticsMock(
+ TABLE_TYPE_NOT_NULL_SCORE,
+ new int[] {0},
+ new int[0],
+ -1,
+ null,
+ List.of(new int[] {0})))
+ .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+ .calledWithLiteralAt(ARG_OP_MAPPING, null)
+ .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+ .expectDataType(
+ DataTypes.ROW(
+ DataTypes.FIELD("op",
DataTypes.STRING()),
+ DataTypes.FIELD(
+ "score",
DataTypes.BIGINT().notNull()))
+ .notNull()),
+ TestSpec.forStrategy(
+ "produces_full_deletes=false in set semantics
widens non-partition-key columns",
+ TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+ .inputTypes(
+ TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE,
MAP_TYPE, BOOLEAN_TYPE)
+ .calledWithTableSemanticsAt(
+ ARG_TABLE,
+ new TableSemanticsMock(
+ TABLE_TYPE_NOT_NULL_SCORE,
+ new int[] {0},
+ new int[0],
+ -1,
+ null,
+ List.of(new int[] {0})))
+ .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+ .calledWithLiteralAt(ARG_OP_MAPPING, null)
+ .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false)
+ .expectDataType(
+ DataTypes.ROW(
+ DataTypes.FIELD("op",
DataTypes.STRING()),
+ DataTypes.FIELD("score",
DataTypes.BIGINT()))
+ .notNull()));
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
index 7cd25c35ab5..96fd1450928 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
@@ -35,6 +35,9 @@ public class FromChangelogRestoreTest extends RestoreTestBase
{
@Override
public List<TableTestProgram> programs() {
- return List.of(FromChangelogTestPrograms.RETRACT_RESTORE);
+ return List.of(
+ FromChangelogTestPrograms.RETRACT_RESTORE,
+ FromChangelogTestPrograms.RETRACT_PARTITION_BY_RESTORE,
+ FromChangelogTestPrograms.UPSERT_PARTITION_BY_RESTORE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 2ab92f052d7..5fa3956de3b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -391,6 +391,71 @@ public class FromChangelogTestPrograms {
+ "input => TABLE cdc_stream)")
.build();
+ public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+ TableTestProgram.of(
+ "from-changelog-retract-partition-by-restore",
+ "FROM_CHANGELOG with PARTITION BY producing a
retract changelog")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema("name STRING", "id INT", "op
STRING")
+ .producedBeforeRestore(
+ Row.of("Alice", 1, "INSERT"),
+ Row.of("Bob", 2, "INSERT"))
+ .producedAfterRestore(
+ Row.of("Alice", 1,
"UPDATE_BEFORE"),
+ Row.of("Alice2", 1,
"UPDATE_AFTER"),
+ Row.of("Bob", 2, "DELETE"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedBeforeRestore(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"))
+ .consumedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_BEFORE,
1, "Alice"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream PARTITION BY
id)")
+ .build();
+
+ public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+ TableTestProgram.of(
+ "from-changelog-upsert-partition-by-restore",
+ "FROM_CHANGELOG with PARTITION BY producing an
upsert changelog "
+ + "(op_mapping without UPDATE_BEFORE)")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema("name STRING", "id INT", "op
STRING")
+ .producedBeforeRestore(
+ Row.of("Alice", 1, "INSERT"),
+ Row.of("Bob", 2, "INSERT"))
+ .producedAfterRestore(
+ Row.of("Alice2", 1,
"UPDATE_AFTER"),
+ Row.of("Bob", 2, "DELETE"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT PRIMARY KEY NOT
ENFORCED", "name STRING")
+ .consumedBeforeRestore(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"))
+ .consumedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream PARTITION BY
id, "
+ + "op_mapping => MAP["
+ + "'INSERT', 'INSERT', "
+ + "'UPDATE_AFTER', 'UPDATE_AFTER', "
+ + "'DELETE', 'DELETE'])")
+ .build();
+
//
--------------------------------------------------------------------------------------------
// Error validation tests
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
index 5b7258946d7..00205798b65 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
@@ -35,6 +35,11 @@ public class ToChangelogRestoreTest extends RestoreTestBase {
@Override
public List<TableTestProgram> programs() {
- return List.of(ToChangelogTestPrograms.RETRACT_RESTORE);
+ return List.of(
+ ToChangelogTestPrograms.RETRACT_RESTORE,
+ ToChangelogTestPrograms.UPSERT_RESTORE,
+ ToChangelogTestPrograms.RETRACT_PARTITION_BY_RESTORE,
+ ToChangelogTestPrograms.UPSERT_PARTITION_BY_RESTORE,
+
ToChangelogTestPrograms.RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 71731109eda..a14b259b068 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -123,6 +123,138 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
+ public static final TableTestProgram UPSERT_RESTORE =
+ TableTestProgram.of(
+ "to-changelog-upsert-restore",
+ "TO_CHANGELOG over an upsert source, including
ChangelogNormalize state")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.upsert())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ // Key-only delete:
ChangelogNormalize restores
+ // Bob's pre-image from state.
+ Row.ofKind(RowKind.DELETE, "Bob",
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedBeforeRestore(
+ "+I[INSERT, Alice, 10]",
"+I[INSERT, Bob, 20]")
+ .consumedAfterRestore(
+ "+I[UPDATE_BEFORE, Alice, 10]",
+ "+I[UPDATE_AFTER, Alice, 30]",
+ "+I[DELETE, Bob, 20]")
+ .build())
+ .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
+ .build();
+
+ public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+ TableTestProgram.of(
+ "to-changelog-retract-partition-by-restore",
+ "TO_CHANGELOG over a retract source with PARTITION
BY")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING NOT NULL", "op
STRING", "score BIGINT")
+ .consumedBeforeRestore(
+ "+I[Alice, INSERT, 10]", "+I[Bob,
INSERT, 20]")
+ .consumedAfterRestore(
+ "+I[Alice, UPDATE_BEFORE, 10]",
+ "+I[Alice, UPDATE_AFTER, 30]",
+ "+I[Bob, DELETE, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY name)")
+ .build();
+
+ public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+ TableTestProgram.of(
+ "to-changelog-upsert-partition-by-restore",
+ "TO_CHANGELOG over an upsert source with PARTITION
BY, including "
+ + "ChangelogNormalize state")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.upsert())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ // Key-only delete:
ChangelogNormalize restores
+ // Bob's pre-image from state.
+ Row.ofKind(RowKind.DELETE, "Bob",
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING NOT NULL", "op
STRING", "score BIGINT")
+ .consumedBeforeRestore(
+ "+I[Alice, INSERT, 10]", "+I[Bob,
INSERT, 20]")
+ .consumedAfterRestore(
+ "+I[Alice, UPDATE_BEFORE, 10]",
+ "+I[Alice, UPDATE_AFTER, 30]",
+ "+I[Bob, DELETE, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY name)")
+ .build();
+
+ public static final TableTestProgram
RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE =
+ TableTestProgram.of(
+
"to-changelog-retract-produces-partial-deletes-restore",
+ "TO_CHANGELOG with produces_full_deletes=false
over a NOT NULL input "
+ + "column; non-key columns are widened to
nullable on DELETE")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED",
+ "score BIGINT NOT NULL")
+ .addMode(ChangelogMode.all())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedBeforeRestore(
+ "+I[INSERT, Alice, 10]",
"+I[INSERT, Bob, 20]")
+ .consumedAfterRestore(
+ "+I[UPDATE_BEFORE, Alice, 10]",
+ "+I[UPDATE_AFTER, Alice, 30]",
+ "+I[DELETE, Bob, null]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t, "
+ + "produces_full_deletes => false)")
+ .build();
+
/** Partitions by a non-leading column ({@code id}, the middle column of
three). */
public static final TableTestProgram RETRACT_PARTITION_BY =
TableTestProgram.of(
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/plan/from-changelog-retract-partition-by-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/plan/from-changelog-retract-partition-by-restore.json
new file mode 100644
index 00000000000..4de3e05d438
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/plan/from-changelog-retract-partition-by-restore.json
@@ -0,0 +1,152 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 4,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`cdc_stream`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, cdc_stream]], fields=[name, id, op])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op`
VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[hash[id]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+ "description" : "ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0)
PARTITION BY($1), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())],
uid=[FROM_CHANGELOG], select=[id,name], rowType=[RecordType(INTEGER id,
VARCHAR(2147483647) name)])",
+ "uid" : "FROM_CHANGELOG",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$FROM_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "partitionKeys" : [ 1 ],
+ "type" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op`
VARCHAR(2147483647)> NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`id` INT, `name` VARCHAR(2147483647)> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT" ] ],
+ "outputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ],
+ "inputUpsertKeys" : [ [ ] ]
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[id, name])"
+ } ],
+ "edges" : [ {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..422977ab9b2
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/plan/from-changelog-upsert-partition-by-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/plan/from-changelog-upsert-partition-by-restore.json
new file mode 100644
index 00000000000..d771bad2480
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/plan/from-changelog-upsert-partition-by-restore.json
@@ -0,0 +1,183 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 8,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`cdc_stream`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, cdc_stream]], fields=[name, id, op])"
+ }, {
+ "id" : 9,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 1 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op`
VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[hash[id]])"
+ }, {
+ "id" : 10,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+ "description" : "ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0)
PARTITION BY($1), DEFAULT(), MAP(_UTF-16LE'INSERT':VARCHAR(12) CHARACTER SET
\"UTF-16LE\", _UTF-16LE'INSERT':VARCHAR(12) CHARACTER SET \"UTF-16LE\",
_UTF-16LE'UPDATE_AFTER':VARCHAR(12) CHARACTER SET \"UTF-16LE\",
_UTF-16LE'UPDATE_AFTER':VARCHAR(12) CHARACTER SET \"UTF-16LE\",
_UTF-16LE'DELETE':VARCHAR(12) CHARACTER SET \"UTF-16LE\",
_UTF-16LE'DELETE':VARCHAR(12) CHARACTER SET \"UTF-16LE\"), DEFAULT(), DEFAULT(
[...]
+ "uid" : "FROM_CHANGELOG",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$FROM_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "partitionKeys" : [ 1 ],
+ "type" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op`
VARCHAR(2147483647)> NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$MAP$1",
+ "operands" : [ {
+ "kind" : "LITERAL",
+ "value" : "INSERT",
+ "type" : "VARCHAR(12) NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "INSERT",
+ "type" : "VARCHAR(12) NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "UPDATE_AFTER",
+ "type" : "VARCHAR(12) NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "UPDATE_AFTER",
+ "type" : "VARCHAR(12) NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "DELETE",
+ "type" : "VARCHAR(12) NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "DELETE",
+ "type" : "VARCHAR(12) NOT NULL"
+ } ],
+ "type" : "MAP<VARCHAR(12) NOT NULL, VARCHAR(12) NOT NULL> NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`id` INT, `name` VARCHAR(2147483647)> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT" ] ],
+ "outputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+ "inputUpsertKeys" : [ [ ] ]
+ }, {
+ "id" : 11,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "primaryKey" : {
+ "name" : "PK_id",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "id" ]
+ }
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputUpsertKey" : [ 0 ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[id, name])"
+ } ],
+ "edges" : [ {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..2914912731c
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/plan/to-changelog-retract-partition-by-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/plan/to-changelog-retract-partition-by-restore.json
new file mode 100644
index 00000000000..76741ae6d82
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/plan/to-changelog-retract-partition-by-restore.json
@@ -0,0 +1,157 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 9,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ],
+ "primaryKey" : {
+ "name" : "PK_name",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "name" ]
+ }
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[name, score])"
+ }, {
+ "id" : 10,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "Exchange(distribution=[hash[name]])"
+ }, {
+ "id" : 11,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op`
VARCHAR(2147483647), `score` BIGINT>",
+ "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0)
PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())],
uid=[TO_CHANGELOG], select=[name,op,score],
rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) op, BIGINT
score)])",
+ "uid" : "TO_CHANGELOG",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "partitionKeys" : [ 0 ],
+ "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT
NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op`
VARCHAR(2147483647), `score` BIGINT> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ] ],
+ "outputChangelogMode" : [ "INSERT" ],
+ "inputUpsertKeys" : [ [ [ 0 ] ] ]
+ }, {
+ "id" : 12,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op`
VARCHAR(2147483647), `score` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[name, op, score])"
+ } ],
+ "edges" : [ {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..6c2826c7177
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/plan/to-changelog-retract-produces-partial-deletes-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/plan/to-changelog-retract-produces-partial-deletes-restore.json
new file mode 100644
index 00000000000..83e5963980c
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/plan/to-changelog-retract-produces-partial-deletes-restore.json
@@ -0,0 +1,135 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 19,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT NOT NULL"
+ } ],
+ "primaryKey" : {
+ "name" : "PK_name",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "name" ]
+ }
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT
NOT NULL>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[name, score])"
+ }, {
+ "id" : 20,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647),
`score` BIGINT>",
+ "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0),
DEFAULT(), DEFAULT(), false, DEFAULT(), DEFAULT())], uid=[null],
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op,
VARCHAR(2147483647) name, BIGINT score)])",
+ "uid" : null,
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT NOT
NULL> NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+ }, {
+ "kind" : "LITERAL",
+ "value" : false,
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647),
`score` BIGINT> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ] ],
+ "outputChangelogMode" : [ "INSERT" ],
+ "inputUpsertKeys" : [ [ [ 0 ] ] ]
+ }, {
+ "id" : 21,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647),
`score` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[op, name, score])"
+ } ],
+ "edges" : [ {
+ "source" : 19,
+ "target" : 20,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 20,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/savepoint/_metadata
new file mode 100644
index 00000000000..fad2d14492d
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/plan/to-changelog-upsert-partition-by-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/plan/to-changelog-upsert-partition-by-restore.json
new file mode 100644
index 00000000000..1d3991ca886
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/plan/to-changelog-upsert-partition-by-restore.json
@@ -0,0 +1,207 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 13,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ],
+ "primaryKey" : {
+ "name" : "PK_name",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "name" ]
+ }
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[name, score])"
+ }, {
+ "id" : 14,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "Exchange(distribution=[hash[name]])"
+ }, {
+ "id" : 15,
+ "type" : "stream-exec-changelog-normalize_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "uniqueKeys" : [ 0 ],
+ "generateUpdateBefore" : true,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "changelogNormalizeState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "ChangelogNormalize(key=[name])"
+ }, {
+ "id" : 16,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "Exchange(distribution=[hash[name]])"
+ }, {
+ "id" : 17,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op`
VARCHAR(2147483647), `score` BIGINT>",
+ "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0)
PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())],
uid=[TO_CHANGELOG], select=[name,op,score],
rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) op, BIGINT
score)])",
+ "uid" : "TO_CHANGELOG",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "partitionKeys" : [ 0 ],
+ "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT
NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op`
VARCHAR(2147483647), `score` BIGINT> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ] ],
+ "outputChangelogMode" : [ "INSERT" ],
+ "inputUpsertKeys" : [ [ [ 0 ] ] ]
+ }, {
+ "id" : 18,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op`
VARCHAR(2147483647), `score` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[name, op, score])"
+ } ],
+ "edges" : [ {
+ "source" : 13,
+ "target" : 14,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 14,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 15,
+ "target" : 16,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 16,
+ "target" : 17,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 17,
+ "target" : 18,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..50eef475775
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/plan/to-changelog-upsert-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/plan/to-changelog-upsert-restore.json
new file mode 100644
index 00000000000..aa61aa93f01
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/plan/to-changelog-upsert-restore.json
@@ -0,0 +1,185 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 4,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647) NOT NULL"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ],
+ "primaryKey" : {
+ "name" : "PK_name",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "name" ]
+ }
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[name, score])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "Exchange(distribution=[hash[name]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-changelog-normalize_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "uniqueKeys" : [ 0 ],
+ "generateUpdateBefore" : true,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "changelogNormalizeState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+ "description" : "ChangelogNormalize(key=[name])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647)
NOT NULL, `score` BIGINT>",
+ "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null],
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op,
VARCHAR(2147483647) name, BIGINT score)])",
+ "uid" : null,
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_CHANGELOG$1",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT
NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "DESCRIPTOR"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT
NULL, `score` BIGINT> NOT NULL"
+ },
+ "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ] ],
+ "outputChangelogMode" : [ "INSERT" ],
+ "inputUpsertKeys" : [ [ [ 0 ] ] ]
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "op",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "dataType" : "BIGINT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647)
NOT NULL, `score` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[op, name, score])"
+ } ],
+ "edges" : [ {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/savepoint/_metadata
new file mode 100644
index 00000000000..af7f44744e7
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/savepoint/_metadata
differ