This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new be11c63a21f [FLINK-35923][table-planner] Add CompiledPlan annotations 
to BatchExecSort/BatchExecExchange
be11c63a21f is described below

commit be11c63a21f2fabf1006d7867e9c74bf2b65ca41
Author: James Hughes <[email protected]>
AuthorDate: Wed Aug 7 03:29:42 2024 -0400

    [FLINK-35923][table-planner] Add CompiledPlan annotations to 
BatchExecSort/BatchExecExchange
---
 .../plan/nodes/exec/batch/BatchExecExchange.java   |  38 ++++-
 .../plan/nodes/exec/batch/BatchExecSort.java       |  40 ++++-
 .../plan/nodes/exec/common/CommonExecExchange.java |   2 +
 .../RequiredDistributionJsonDeserializer.java      |  19 +++
 .../serde/RequiredDistributionJsonSerializer.java  |  10 ++
 .../plan/nodes/exec/stream/StreamExecExchange.java |   5 +-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   4 +
 .../SortBatchRestoreTest.java}                     |  14 +-
 .../TableSinkBatchRestoreTest.java}                |  16 +-
 .../exec/{stream => common}/SortTestPrograms.java  |  10 +-
 .../{stream => common}/TableSinkTestPrograms.java  |  26 ++--
 .../nodes/exec/stream/SortLimitRestoreTest.java    |   1 +
 .../plan/nodes/exec/stream/SortRestoreTest.java    |   1 +
 .../nodes/exec/stream/TableSinkRestoreTest.java    |   1 +
 .../sink-bucketing_hash-with-keys-with-count.json  | 118 ++++++++++++++
 ...nk-bucketing_range_with_keys_without_count.json | 117 ++++++++++++++
 .../plan/sink-bucketing_with-count.json            | 118 ++++++++++++++
 .../plan/sink-bucketing_with-keys-and-count.json   | 118 ++++++++++++++
 .../sink-overwrite/plan/sink-overwrite.json        |  81 ++++++++++
 .../plan/sink-partial-insert.json                  | 125 +++++++++++++++
 .../sink-partition/plan/sink-partition.json        | 123 +++++++++++++++
 .../plan/sink-writing-metadata.json                |  84 ++++++++++
 .../batch-exec-sort_1/sort-asc/plan/sort-asc.json  | 169 +++++++++++++++++++++
 .../sort-desc/plan/sort-desc.json                  | 169 +++++++++++++++++++++
 24 files changed, 1366 insertions(+), 43 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
index 5b0f025d047..734c2625a87 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.dag.Transformation;
@@ -38,6 +39,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.HashDistribution;
 import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.KeepInputAsIsDistribution;
@@ -47,25 +49,34 @@ import 
org.apache.flink.table.runtime.partitioner.BinaryHashPartitioner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.utils.StreamExchangeModeUtils.getBatchStreamExchangeMode;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/**
- * This {@link ExecNode} represents a change of partitioning of the input 
elements for batch.
- *
- * <p>TODO Remove this class once FLINK-21224 is finished.
- */
+/** This {@link ExecNode} represents a change of partitioning of the input 
elements for batch. */
+@ExecNodeMetadata(
+        name = "batch-exec-exchange",
+        version = 1,
+        producedTransformations = CommonExecExchange.EXCHANGE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecExchange extends CommonExecExchange implements 
BatchExecNode<RowData> {
+    public static final String FIELD_NAME_REQUIRED_EXCHANGE_MODE = 
"requiredExchangeMode";
+
     // the required exchange mode for reusable BatchExecExchange
     // if it's None, use value from configuration
-    @Nullable private StreamExchangeMode requiredExchangeMode;
+    @JsonProperty(FIELD_NAME_REQUIRED_EXCHANGE_MODE)
+    private StreamExchangeMode requiredExchangeMode = 
StreamExchangeMode.UNDEFINED;
 
     public BatchExecExchange(
             ReadableConfig tableConfig,
@@ -81,6 +92,20 @@ public class BatchExecExchange extends CommonExecExchange 
implements BatchExecNo
                 description);
     }
 
+    @JsonCreator
+    public BatchExecExchange(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description,
+            @JsonProperty(FIELD_NAME_REQUIRED_EXCHANGE_MODE)
+                    StreamExchangeMode requiredExchangeMode) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.requiredExchangeMode = requiredExchangeMode;
+    }
+
     public void setRequiredExchangeMode(@Nullable StreamExchangeMode 
requiredExchangeMode) {
         this.requiredExchangeMode = requiredExchangeMode;
     }
@@ -199,6 +224,7 @@ public class BatchExecExchange extends CommonExecExchange 
implements BatchExecNo
                         : getBatchStreamExchangeMode(config, 
requiredExchangeMode);
         final Transformation<RowData> transformation =
                 new PartitionTransformation<>(inputTransform, partitioner, 
exchangeMode);
+        createTransformationMeta(EXCHANGE_TRANSFORMATION, 
config).fill(transformation);
         transformation.setParallelism(parallelism);
         transformation.setOutputType(InternalTypeInfo.of(getOutputType()));
         return transformation;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
index 929d6ed60ba..a559b597bbf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -29,6 +30,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
@@ -37,16 +39,37 @@ import 
org.apache.flink.table.runtime.operators.sort.SortOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Collections;
+import java.util.List;
 
 /**
  * {@link BatchExecNode} for Sort without limit.
  *
  * <p>This node will output all data rather than `limit` records.
  */
+@ExecNodeMetadata(
+        name = "batch-exec-sort",
+        version = 1,
+        producedTransformations = {BatchExecSort.SORT_TRANSFORMATION},
+        consumedOptions = {
+            "table.exec.sort.max-num-file-handles",
+            "table.exec.sort.async-merge-enabled",
+            "table.exec.spill-compression.enabled",
+            "table.exec.spill-compression.block-size",
+            "table.exec.resource.sort.memory"
+        },
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecSort extends ExecNodeBase<RowData>
         implements BatchExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
+    public static final String SORT_TRANSFORMATION = "sort";
+
+    public static final String FIELD_NAME_SORT_SPEC = "sortSpec";
 
+    @JsonProperty(FIELD_NAME_SORT_SPEC)
     private final SortSpec sortSpec;
 
     public BatchExecSort(
@@ -65,6 +88,19 @@ public class BatchExecSort extends ExecNodeBase<RowData>
         this.sortSpec = sortSpec;
     }
 
+    @JsonCreator
+    public BatchExecSort(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.sortSpec = sortSpec;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected Transformation<RowData> translateToPlanInternal(
@@ -92,10 +128,10 @@ public class BatchExecSort extends ExecNodeBase<RowData>
                         
config.get(ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
         long sortMemory =
                 
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(SORT_TRANSFORMATION, config),
                 SimpleOperatorFactory.of(operator),
                 InternalTypeInfo.of((RowType) getOutputType()),
                 inputTransform.getParallelism(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExchange.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExchange.java
index 809bcf94f27..dde513e1597 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExchange.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecExchange.java
@@ -36,6 +36,8 @@ import java.util.List;
 public abstract class CommonExecExchange extends ExecNodeBase<RowData>
         implements SingleTransformationTranslator<RowData> {
 
+    public static final String EXCHANGE_TRANSFORMATION = "exchange";
+
     public CommonExecExchange(
             int id,
             ExecNodeContext context,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
index cbbb7a65041..f6b62483455 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
@@ -69,6 +69,25 @@ final class RequiredDistributionJsonDeserializer extends 
StdDeserializer<Require
                     keys[i] = keysNode.get(i).asInt();
                 }
                 return InputProperty.hashDistribution(keys);
+            case KEEP_INPUT_AS_IS:
+                JsonNode inputDistributionNode = 
jsonNode.get("inputDistribution");
+                if (inputDistributionNode == null) {
+                    throw new TableException(
+                            "KeepInputAsIs distribution requires non-empty "
+                                    + "inputDistribution field.");
+                }
+                RequiredDistribution inputDistribution =
+                        inputDistributionNode
+                                .traverse(jsonParser.getCodec())
+                                .readValueAs(RequiredDistribution.class);
+
+                JsonNode isStrictNode = jsonNode.get("isStrict");
+                if (isStrictNode == null) {
+                    throw new TableException(
+                            "KeepInputAsIs distribution requires non-empty 
isStrict field.");
+                }
+                boolean isStrict = isStrictNode.asBoolean();
+                return 
InputProperty.keepInputAsIsDistribution(inputDistribution, isStrict);
             default:
                 throw new TableException("Unsupported distribution type: " + 
type);
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
index 1817557526b..93898a34e9b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
 import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.HashDistribution;
+import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.KeepInputAsIsDistribution;
 import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -59,6 +60,15 @@ final class RequiredDistributionJsonSerializer extends 
StdSerializer<RequiredDis
             case UNKNOWN:
                 // do nothing, type name is enough
                 break;
+            case KEEP_INPUT_AS_IS:
+                KeepInputAsIsDistribution asisDistribution =
+                        (KeepInputAsIsDistribution) requiredDistribution;
+                jsonGenerator.writeFieldName("inputDistribution");
+                serialize(
+                        asisDistribution.getInputDistribution(), 
jsonGenerator, serializerProvider);
+                jsonGenerator.writeFieldName("isStrict");
+                jsonGenerator.writeBoolean(asisDistribution.isStrict());
+                break;
             case HASH:
                 HashDistribution hashDistribution = (HashDistribution) 
requiredDistribution;
                 jsonGenerator.writeFieldName("keys");
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
index 3d5cf3f13ca..3664e1968fa 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
@@ -58,13 +58,10 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 @ExecNodeMetadata(
         name = "stream-exec-exchange",
         version = 1,
-        producedTransformations = StreamExecExchange.EXCHANGE_TRANSFORMATION,
+        producedTransformations = CommonExecExchange.EXCHANGE_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
 public class StreamExecExchange extends CommonExecExchange implements 
StreamExecNode<RowData> {
-
-    public static final String EXCHANGE_TRANSFORMATION = "exchange";
-
     public StreamExecExchange(
             ReadableConfig tableConfig,
             InputProperty inputProperty,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 99b4db795a2..ffc4da4a6de 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -29,7 +29,9 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
@@ -155,6 +157,8 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecSink.class);
                     add(BatchExecTableSourceScan.class);
                     add(BatchExecCalc.class);
+                    add(BatchExecExchange.class);
+                    add(BatchExecSort.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortBatchRestoreTest.java
similarity index 72%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortBatchRestoreTest.java
index 18e9792f9ed..22f0fa4846a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortBatchRestoreTest.java
@@ -16,19 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.planner.plan.nodes.exec.common.SortTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecSort}. */
-public class SortRestoreTest extends RestoreTestBase {
+/** Restore tests for {@link BatchExecSort}. */
+public class SortBatchRestoreTest extends BatchRestoreTestBase {
 
-    public SortRestoreTest() {
-        super(StreamExecSort.class, AfterRestoreSource.NO_RESTORE);
+    public SortBatchRestoreTest() {
+        super(BatchExecSort.class, 
Collections.singletonList(BatchExecExchange.class));
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/TableSinkBatchRestoreTest.java
similarity index 76%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/TableSinkBatchRestoreTest.java
index 4d888d3e283..f70dc1a9059 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/TableSinkBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.TableSinkTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecSink}. */
-public class TableSinkRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecSink}. */
+public class TableSinkBatchRestoreTest extends BatchRestoreTestBase {
 
-    public TableSinkRestoreTest() {
-        super(StreamExecSink.class);
+    public TableSinkBatchRestoreTest() {
+        super(BatchExecSink.class);
     }
 
     @Override
@@ -41,7 +42,8 @@ public class TableSinkRestoreTest extends RestoreTestBase {
                 TableSinkTestPrograms.SINK_PARTITION,
                 TableSinkTestPrograms.SINK_OVERWRITE,
                 TableSinkTestPrograms.SINK_WRITING_METADATA,
-                TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY,
+                // Test needs materialization or other update
+                // TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY,
                 TableSinkTestPrograms.SINK_PARTIAL_INSERT);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
similarity index 97%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
index 2959e2e6a0e..c0e39fb1469 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.table.planner.utils.InternalConfigOptions;
 import org.apache.flink.table.test.program.SinkTestStep;
@@ -40,7 +40,7 @@ public class SortTestPrograms {
         Row.of(5, "c", 9)
     };
 
-    static final TableTestProgram SORT_LIMIT_ASC =
+    public static final TableTestProgram SORT_LIMIT_ASC =
             TableTestProgram.of(
                             "sort-limit-asc",
                             "validates sort limit node by sorting integers in 
asc mode")
@@ -88,7 +88,7 @@ public class SortTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a LIMIT 3")
                     .build();
 
-    static final TableTestProgram SORT_LIMIT_DESC =
+    public static final TableTestProgram SORT_LIMIT_DESC =
             TableTestProgram.of(
                             "sort-limit-desc",
                             "validates sort limit node by sorting integers in 
desc mode")
@@ -126,7 +126,7 @@ public class SortTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a DESC LIMIT 3")
                     .build();
 
-    static final TableTestProgram SORT_ASC =
+    public static final TableTestProgram SORT_ASC =
             TableTestProgram.of("sort-asc", "validates sort node by sorting 
integers in asc mode")
                     
.setupConfig(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
                     .setupTableSource(
@@ -148,7 +148,7 @@ public class SortTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a")
                     .build();
 
-    static final TableTestProgram SORT_DESC =
+    public static final TableTestProgram SORT_DESC =
             TableTestProgram.of("sort-desc", "validates sort node by sorting 
integers in desc mode")
                     
.setupConfig(InternalConfigOptions.TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED, true)
                     .setupTableSource(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
similarity index 91%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
index 0a42e821e98..3b8c043b4c8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.table.catalog.TableDistribution;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
@@ -30,23 +30,23 @@ import java.util.Arrays;
 /** {@link TableTestProgram} definitions for testing {@link 
StreamExecDeduplicate}. */
 public class TableSinkTestPrograms {
 
-    static final Row[] BEFORE_DATA = {
+    public static final Row[] BEFORE_DATA = {
         Row.of(1, 1L, "hi"), Row.of(2, 2L, "hello"), Row.of(3, 2L, "hello 
world")
     };
 
-    static final Row[] AFTER_DATA = {Row.of(4, 4L, "foo"), Row.of(5, 2L, "foo 
bar")};
+    public static final Row[] AFTER_DATA = {Row.of(4, 4L, "foo"), Row.of(5, 
2L, "foo bar")};
 
-    static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c VARCHAR"};
+    public static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c 
VARCHAR"};
 
-    static final TableTestProgram SINK_BUCKETING_WITH_COUNT =
+    public static final TableTestProgram SINK_BUCKETING_WITH_COUNT =
             buildBucketingTest("with-count", TableDistribution.ofUnknown(3));
-    static final TableTestProgram SINK_BUCKETING_WITH_KEYS_AND_COUNT =
+    public static final TableTestProgram SINK_BUCKETING_WITH_KEYS_AND_COUNT =
             buildBucketingTest(
                     "with-keys-and-count", 
TableDistribution.ofUnknown(Arrays.asList("a"), 3));
-    static final TableTestProgram SINK_BUCKETING_HASH_WITH_KEYS_AND_COUNT =
+    public static final TableTestProgram 
SINK_BUCKETING_HASH_WITH_KEYS_AND_COUNT =
             buildBucketingTest(
                     "hash-with-keys-with-count", 
TableDistribution.ofHash(Arrays.asList("a"), 3));
-    static final TableTestProgram 
SINK_BUCKETING_HASH_WITH_KEYS_AND_WITHOUT_COUNT =
+    public static final TableTestProgram 
SINK_BUCKETING_HASH_WITH_KEYS_AND_WITHOUT_COUNT =
             buildBucketingTest(
                     "range_with_keys_without_count",
                     TableDistribution.ofHash(Arrays.asList("a"), null));
@@ -73,7 +73,7 @@ public class TableSinkTestPrograms {
                 .build();
     }
 
-    static final TableTestProgram SINK_PARTITION =
+    public static final TableTestProgram SINK_PARTITION =
             TableTestProgram.of("sink-partition", "validates sink partition")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
@@ -97,7 +97,7 @@ public class TableSinkTestPrograms {
                     .runSql("INSERT INTO sink_t PARTITION (b=2) SELECT * FROM 
source_t")
                     .build();
 
-    static final TableTestProgram SINK_OVERWRITE =
+    public static final TableTestProgram SINK_OVERWRITE =
             TableTestProgram.of("sink-overwrite", "validates sink with 
overwrite")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
@@ -116,7 +116,7 @@ public class TableSinkTestPrograms {
                                     .build())
                     .runSql("INSERT OVERWRITE sink_t SELECT * FROM source_t")
                     .build();
-    static final TableTestProgram SINK_WRITING_METADATA =
+    public static final TableTestProgram SINK_WRITING_METADATA =
             TableTestProgram.of("sink-writing-metadata", "validates writing 
metadata to sink")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
@@ -137,7 +137,7 @@ public class TableSinkTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT * FROM source_t")
                     .build();
 
-    static final TableTestProgram SINK_NDF_PRIMARY_KEY =
+    public static final TableTestProgram SINK_NDF_PRIMARY_KEY =
             TableTestProgram.of(
                             "sink-ndf-primary-key",
                             "validates sink with ndf and different primary 
key")
@@ -167,7 +167,7 @@ public class TableSinkTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, b, ndf(c) FROM 
source_t")
                     .build();
 
-    static final TableTestProgram SINK_PARTIAL_INSERT =
+    public static final TableTestProgram SINK_PARTIAL_INSERT =
             TableTestProgram.of("sink-partial-insert", "validates sink with 
partial insert")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java
index 58e53435318..44c5aa7cb37 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortLimitRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.planner.plan.nodes.exec.common.SortTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
index 18e9792f9ed..9726c8f670d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.planner.plan.nodes.exec.common.SortTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
index 4d888d3e283..7c52680a27a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.TableSinkTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json
new file mode 100644
index 00000000000..6f059a252ed
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json
@@ -0,0 +1,118 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 7,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "b2b7db2f-1758-4338-8f93-1470febd2591",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sort(orderBy=[b ASC])"
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "distribution" : {
+            "kind" : "HASH",
+            "bucketCount" : 3,
+            "bucketKeys" : [ "a" ]
+          },
+          "partitionKeys" : [ "b" ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json
new file mode 100644
index 00000000000..55c175f06fb
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json
@@ -0,0 +1,117 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 10,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "c888e576-e89a-4617-aa71-df8882e391f5",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 11,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sort(orderBy=[b ASC])"
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "distribution" : {
+            "kind" : "HASH",
+            "bucketKeys" : [ "a" ]
+          },
+          "partitionKeys" : [ "b" ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_with-count/plan/sink-bucketing_with-count.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_with-count/plan/sink-bucketing_with-count.json
new file mode 100644
index 00000000000..0c1a738f398
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_with-count/plan/sink-bucketing_with-count.json
@@ -0,0 +1,118 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "a02f5517-b8b6-4ab8-9f58-1ebbf040f628",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sort(orderBy=[b ASC])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "distribution" : {
+            "kind" : "UNKNOWN",
+            "bucketCount" : 3,
+            "bucketKeys" : [ ]
+          },
+          "partitionKeys" : [ "b" ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json
new file mode 100644
index 00000000000..fce26b35a90
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json
@@ -0,0 +1,118 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 4,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "1770cef5-14e5-4ca7-9813-2152a3fb194a",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sort(orderBy=[b ASC])"
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "distribution" : {
+            "kind" : "UNKNOWN",
+            "bucketCount" : 3,
+            "bucketKeys" : [ "a" ]
+          },
+          "partitionKeys" : [ "b" ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-overwrite/plan/sink-overwrite.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-overwrite/plan/sink-overwrite.json
new file mode 100644
index 00000000000..25870b8d445
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-overwrite/plan/sink-overwrite.json
@@ -0,0 +1,81 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 16,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "f1099093-b51f-4d4b-97dc-ecce2a719dae",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 17,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Overwrite",
+        "overwrite" : true
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json
new file mode 100644
index 00000000000..bc56293e523
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json
@@ -0,0 +1,125 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 20,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "beb3b16f-f2a0-41fe-9de1-fab0985203d9",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 21,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : null,
+      "type" : "DECIMAL(10, 2)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : null,
+      "type" : "DOUBLE"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` 
DECIMAL(10, 2), `EXPR$4` DOUBLE>",
+    "description" : "Calc(select=[a, b, c, null:DECIMAL(10, 2) AS EXPR$3, 
null:DOUBLE AS EXPR$4])"
+  }, {
+    "id" : 22,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "e",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` 
DECIMAL(10, 2), `EXPR$4` DOUBLE>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
targetColumns=[[0],[1],[2]], fields=[a, b, c, EXPR$3, EXPR$4])"
+  } ],
+  "edges" : [ {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-partition/plan/sink-partition.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-partition/plan/sink-partition.json
new file mode 100644
index 00000000000..06ab61b1807
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-partition/plan/sink-partition.json
@@ -0,0 +1,123 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 13,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "f2ed12f3-98dd-488c-ba66-a3bc70145df1",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 14,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 2,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, 2 AS EXPR$1, b, c])"
+  }, {
+    "id" : 15,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "p",
+              "dataType" : "BIGINT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ "b" ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "Partitioning",
+        "partition" : {
+          "b" : "2"
+        }
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json
new file mode 100644
index 00000000000..72a2d796243
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json
@@ -0,0 +1,84 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 18,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "dedcb38e-95f0-4eb8-b6a5-dbafcb8cebe9",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 19,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "kind" : "METADATA",
+              "dataType" : "VARCHAR(2147483647)",
+              "isVirtual" : false
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "WritingMetadata",
+        "metadataKeys" : [ "c" ],
+        "consumedType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)> 
NOT NULL"
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort_1/sort-asc/plan/sort-asc.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort_1/sort-asc/plan/sort-asc.json
new file mode 100644
index 00000000000..f82503add2e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort_1/sort-asc/plan/sort-asc.json
@@ -0,0 +1,169 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "f8f5519a-a5e2-4090-814c-9e47572c88f4",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Exchange(distribution=[single])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Sort(orderBy=[a ASC])"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort_1/sort-desc/plan/sort-desc.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort_1/sort-desc/plan/sort-desc.json
new file mode 100644
index 00000000000..999fec6364a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort_1/sort-desc/plan/sort-desc.json
@@ -0,0 +1,169 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 6,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "ccc3d77c-104e-453c-8d1f-b57c734221e3",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 7,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Exchange(distribution=[single])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : false,
+        "nullIsLast" : true
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Sort(orderBy=[a DESC])"
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+  }, {
+    "id" : 10,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to