This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a9b9ce81ca05398f8891c918c74294402462f5c
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Wed Jan 3 17:24:19 2024 -0800

    [FLINK-33979] Implement restore tests for TableSink node
---
 .../planner/factories/TestValuesTableFactory.java  |   9 +-
 .../nodes/exec/stream/TableSinkRestoreTest.java    |  43 ++++++
 .../nodes/exec/stream/TableSinkTestPrograms.java   | 158 +++++++++++++++++++++
 .../utils/JavaUserDefinedScalarFunctions.java      |   2 +-
 .../plan/sink-ndf-primary-key.json                 | 123 ++++++++++++++++
 .../sink-ndf-primary-key/savepoint/_metadata       | Bin 0 -> 5080 bytes
 .../sink-overwrite/plan/sink-overwrite.json        |  84 +++++++++++
 .../sink-overwrite/savepoint/_metadata             | Bin 0 -> 8381 bytes
 .../plan/sink-partial-insert.json                  | 128 +++++++++++++++++
 .../sink-partial-insert/savepoint/_metadata        | Bin 0 -> 11034 bytes
 .../sink-partition/plan/sink-partition.json        | 126 ++++++++++++++++
 .../sink-partition/savepoint/_metadata             | Bin 0 -> 9435 bytes
 .../plan/sink-writing-metadata.json                |  87 ++++++++++++
 .../sink-writing-metadata/savepoint/_metadata      | Bin 0 -> 8331 bytes
 14 files changed, 758 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 8511e30ce3c..3dbf4d5b9c0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.OutputFormatProvider;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
@@ -1937,7 +1938,10 @@ public final class TestValuesTableFactory
 
     /** Values {@link DynamicTableSink} for testing. */
     private static class TestValuesTableSink
-            implements DynamicTableSink, SupportsWritingMetadata, 
SupportsPartitioning {
+            implements DynamicTableSink,
+                    SupportsWritingMetadata,
+                    SupportsPartitioning,
+                    SupportsOverwrite {
 
         private DataType consumedDataType;
         private int[] primaryKeyIndices;
@@ -2135,6 +2139,9 @@ public final class TestValuesTableFactory
         public boolean requiresPartitionGrouping(boolean supportsGrouping) {
             return supportsGrouping;
         }
+
+        @Override
+        public void applyOverwrite(boolean overwrite) {}
     }
 
     /** A TableSink used for testing the implementation of {@link 
SinkFunction.Context}. */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
new file mode 100644
index 00000000000..1ab3651cf09
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecSink}. */
+public class TableSinkRestoreTest extends RestoreTestBase {
+
+    public TableSinkRestoreTest() {
+        super(StreamExecSink.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                TableSinkTestPrograms.SINK_PARTITION,
+                TableSinkTestPrograms.SINK_OVERWRITE,
+                TableSinkTestPrograms.SINK_WRITING_METADATA,
+                TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY,
+                TableSinkTestPrograms.SINK_PARTIAL_INSERT);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java
new file mode 100644
index 00000000000..3f5a51922fe
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecDeduplicate}. */
+public class TableSinkTestPrograms {
+
+    static final Row[] BEFORE_DATA = {
+        Row.of(1, 1L, "hi"), Row.of(2, 2L, "hello"), Row.of(3, 2L, "hello 
world")
+    };
+
+    static final Row[] AFTER_DATA = {Row.of(4, 4L, "foo"), Row.of(5, 2L, "foo 
bar")};
+
+    static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c VARCHAR"};
+
+    static final TableTestProgram SINK_PARTITION =
+            TableTestProgram.of("sink-partition", "validates sink partition")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "a INT", "b BIGINT", "p BIGINT NOT 
NULL", "c VARCHAR")
+                                    .addPartitionKeys("b")
+                                    .addOption("partition-list", 
"b:1;b:2;b:3;b:4")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 2, 1, hi]",
+                                            "+I[2, 2, 2, hello]",
+                                            "+I[3, 2, 2, hello world]")
+                                    .consumedAfterRestore(
+                                            "+I[4, 2, 4, foo]", "+I[5, 2, 2, 
foo bar]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t PARTITION (b=2) SELECT * FROM 
source_t")
+                    .build();
+
+    static final TableTestProgram SINK_OVERWRITE =
+            TableTestProgram.of("sink-overwrite", "validates sink with 
overwrite")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "b BIGINT", "c 
VARCHAR")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, hi]",
+                                            "+I[2, 2, hello]",
+                                            "+I[3, 2, hello world]")
+                                    .consumedAfterRestore("+I[4, 4, foo]", 
"+I[5, 2, foo bar]")
+                                    .build())
+                    .runSql("INSERT OVERWRITE sink_t SELECT * FROM source_t")
+                    .build();
+    static final TableTestProgram SINK_WRITING_METADATA =
+            TableTestProgram.of("sink-writing-metadata", "validates writing 
metadata to sink")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "b BIGINT", "c VARCHAR 
METADATA")
+                                    .addOption("writable-metadata", "c:STRING")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, hi]",
+                                            "+I[2, 2, hello]",
+                                            "+I[3, 2, hello world]")
+                                    .consumedAfterRestore("+I[4, 4, foo]", 
"+I[5, 2, foo bar]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+                    .build();
+
+    static final TableTestProgram SINK_NDF_PRIMARY_KEY =
+            TableTestProgram.of(
+                            "sink-ndf-primary-key",
+                            "validates sink with ndf and different primary 
key")
+                    .setupTemporaryCatalogFunction(
+                            "ndf", 
JavaUserDefinedScalarFunctions.NonDeterministicUdf.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "a INT",
+                                            "b BIGINT",
+                                            "c VARCHAR",
+                                            "PRIMARY KEY(c) NOT ENFORCED")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, hi--1170105035]",
+                                            "+I[2, 2, hello-234785527]",
+                                            "+I[3, 2, hello 
world--1360544799]")
+                                    .consumedAfterRestore(
+                                            "+I[4, 4, foo--1170105035]",
+                                            "+I[5, 2, foo bar-234785527]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, ndf(c) FROM 
source_t")
+                    .build();
+
+    static final TableTestProgram SINK_PARTIAL_INSERT =
+            TableTestProgram.of("sink-partial-insert", "validates sink with 
partial insert")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "a INT",
+                                            "b BIGINT",
+                                            "c VARCHAR",
+                                            "d DECIMAL(10,2)",
+                                            "e DOUBLE")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, hi, null, null]",
+                                            "+I[2, 2, hello, null, null]",
+                                            "+I[3, 2, hello world, null, 
null]")
+                                    .consumedAfterRestore(
+                                            "+I[4, 4, foo, null, null]",
+                                            "+I[5, 2, foo bar, null, null]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t (a, b, c) SELECT a, b, c FROM 
source_t")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index bbd0aa2b478..7013cd32083 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -121,7 +121,7 @@ public class JavaUserDefinedScalarFunctions {
 
     /** Non-deterministic scalar function. */
     public static class NonDeterministicUdf extends ScalarFunction {
-        Random random = new Random();
+        Random random = new Random(42); // seed for tests
 
         public int eval() {
             return random.nextInt();
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/plan/sink-ndf-primary-key.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/plan/sink-ndf-primary-key.json
new file mode 100644
index 00000000000..5c566d9d3e2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/plan/sink-ndf-primary-key.json
@@ -0,0 +1,123 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 8,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`ndf`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, b, ndf(c) AS EXPR$2])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_c",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "c" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, EXPR$2])"
+  } ],
+  "edges" : [ {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/savepoint/_metadata
new file mode 100644
index 00000000000..27c4f41181d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/plan/sink-overwrite.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/plan/sink-overwrite.json
new file mode 100644
index 00000000000..a2a09bc5178
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/plan/sink-overwrite.json
@@ -0,0 +1,84 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 4,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Overwrite",
+        "overwrite" : true
+      } ]
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/savepoint/_metadata
new file mode 100644
index 00000000000..5430483d096
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json
new file mode 100644
index 00000000000..6c1b2784b94
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json
@@ -0,0 +1,128 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 11,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : null,
+      "type" : "DECIMAL(10, 2)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : null,
+      "type" : "DOUBLE"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` 
DECIMAL(10, 2), `EXPR$4` DOUBLE>",
+    "description" : "Calc(select=[a, b, c, null:DECIMAL(10, 2) AS EXPR$3, 
null:DOUBLE AS EXPR$4])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "e",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ]
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` 
DECIMAL(10, 2), `EXPR$4` DOUBLE>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
targetColumns=[[0],[1],[2]], fields=[a, b, c, EXPR$3, EXPR$4])"
+  } ],
+  "edges" : [ {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/savepoint/_metadata
new file mode 100644
index 00000000000..1c895c8a2b7
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/plan/sink-partition.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/plan/sink-partition.json
new file mode 100644
index 00000000000..cb4ebaa0e94
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/plan/sink-partition.json
@@ -0,0 +1,126 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 2,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, 2 AS EXPR$1, b, c])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "p",
+              "dataType" : "BIGINT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ "b" ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Partitioning",
+        "partition" : {
+          "b" : "2"
+        }
+      } ]
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/savepoint/_metadata
new file mode 100644
index 00000000000..24aebfe0308
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json
new file mode 100644
index 00000000000..2220fab28ac
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json
@@ -0,0 +1,87 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 6,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "kind" : "METADATA",
+              "dataType" : "VARCHAR(2147483647)",
+              "isVirtual" : false
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "WritingMetadata",
+        "metadataKeys" : [ "c" ],
+        "consumedType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)> 
NOT NULL"
+      } ]
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/savepoint/_metadata
new file mode 100644
index 00000000000..b7874e2a2ef
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/savepoint/_metadata
 differ


Reply via email to