This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a45cd6cffb [FLINK-35941][table-planner] Add CompiledPlan annotations 
to BatchExecLimit
8a45cd6cffb is described below

commit 8a45cd6cffb7ff55e0af6c1167b255d7da684ff5
Author: James Hughes <[email protected]>
AuthorDate: Thu Aug 8 10:02:09 2024 -0400

    [FLINK-35941][table-planner] Add CompiledPlan annotations to BatchExecLimit
---
 .../plan/nodes/exec/batch/BatchExecLimit.java      |  42 ++++-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   2 +
 .../LimitBatchRestoreTest.java}                    |  13 +-
 .../TableSourceScanBatchRestoreTest.java}          |  15 +-
 .../exec/{stream => common}/LimitTestPrograms.java |   4 +-
 .../TableSourceScanTestPrograms.java               |  20 +--
 .../plan/nodes/exec/stream/LimitRestoreTest.java   |   1 +
 .../exec/stream/TableSourceScanRestoreTest.java    |   1 +
 .../batch-exec-limit_1/limit/plan/limit.json       | 184 +++++++++++++++++++++
 .../plan/table-source-scan-filter-pushdown.json    |  95 +++++++++++
 .../plan/table-source-scan-limit-pushdown.json     | 145 ++++++++++++++++
 .../plan/table-source-scan-partition-pushdown.json | 130 +++++++++++++++
 ...ble-source-scan-project-push-down-disabled.json | 113 +++++++++++++
 .../plan/table-source-scan-project-pushdown.json   |  83 ++++++++++
 .../plan/table-source-scan-reading-metadata.json   |  91 ++++++++++
 .../plan/table-source-scan-reuse-source.json       | 176 ++++++++++++++++++++
 16 files changed, 1087 insertions(+), 28 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLimit.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLimit.java
index d5646cc00ca..509e6d9021d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLimit.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLimit.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -27,20 +28,41 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.runtime.operators.sort.LimitOperator;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Collections;
+import java.util.List;
 
 /** Batch {@link ExecNode} for Limit. */
+@ExecNodeMetadata(
+        name = "batch-exec-limit",
+        version = 1,
+        producedTransformations = BatchExecLimit.LIMIT_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecLimit extends ExecNodeBase<RowData>
         implements BatchExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
+    public static final String LIMIT_TRANSFORMATION = "limit";
+    public static final String FIELD_NAME_LIMIT_START = "limitStart";
+    public static final String FIELD_NAME_LIMIT_END = "limitEnd";
+    public static final String FIELD_NAME_IS_GLOBAL = "isGlobal";
+
+    @JsonProperty(FIELD_NAME_LIMIT_START)
     private final long limitStart;
+
+    @JsonProperty(FIELD_NAME_LIMIT_END)
     private final long limitEnd;
+
+    @JsonProperty(FIELD_NAME_IS_GLOBAL)
     private final boolean isGlobal;
 
     public BatchExecLimit(
@@ -63,6 +85,23 @@ public class BatchExecLimit extends ExecNodeBase<RowData>
         this.limitEnd = limitEnd;
     }
 
+    @JsonCreator
+    public BatchExecLimit(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_LIMIT_START) long limitStart,
+            @JsonProperty(FIELD_NAME_LIMIT_END) long limitEnd,
+            @JsonProperty(FIELD_NAME_IS_GLOBAL) boolean isGlobal,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) LogicalType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.isGlobal = isGlobal;
+        this.limitStart = limitStart;
+        this.limitEnd = limitEnd;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected Transformation<RowData> translateToPlanInternal(
@@ -72,8 +111,7 @@ public class BatchExecLimit extends ExecNodeBase<RowData>
         LimitOperator operator = new LimitOperator(isGlobal, limitStart, 
limitEnd);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(LIMIT_TRANSFORMATION, config),
                 SimpleOperatorFactory.of(operator),
                 inputTransform.getOutputType(),
                 inputTransform.getParallelism(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index ad57d0f49c8..f9069e52cdc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJoin;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
@@ -167,6 +168,7 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecCorrelate.class);
                     add(BatchExecHashJoin.class);
                     add(BatchExecNestedLoopJoin.class);
+                    add(BatchExecLimit.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/LimitBatchRestoreTest.java
similarity index 75%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/LimitBatchRestoreTest.java
index 9d6180064ff..46322e60131 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/LimitBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.planner.plan.nodes.exec.common.LimitTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecLimit}. */
-public class LimitRestoreTest extends RestoreTestBase {
+/** Restore tests for {@link BatchExecLimit}. */
+public class LimitBatchRestoreTest extends BatchRestoreTestBase {
 
-    public LimitRestoreTest() {
-        super(StreamExecLimit.class);
+    public LimitBatchRestoreTest() {
+        super(BatchExecLimit.class);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/TableSourceScanBatchRestoreTest.java
similarity index 78%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/TableSourceScanBatchRestoreTest.java
index 690d98c3ea7..8afc70c9b75 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/TableSourceScanBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.TableSourceScanTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecTableSourceScan}. */
-public class TableSourceScanRestoreTest extends RestoreTestBase {
+/** Restore tests for {@link BatchExecTableSourceScan}. */
+public class TableSourceScanBatchRestoreTest extends BatchRestoreTestBase {
 
-    public TableSourceScanRestoreTest() {
-        super(StreamExecTableSourceScan.class);
+    public TableSourceScanBatchRestoreTest() {
+        super(BatchExecTableSourceScan.class);
     }
 
     @Override
@@ -40,8 +41,6 @@ public class TableSourceScanRestoreTest extends 
RestoreTestBase {
                 TableSourceScanTestPrograms.LIMIT_PUSHDOWN,
                 TableSourceScanTestPrograms.PARTITION_PUSHDOWN,
                 TableSourceScanTestPrograms.READING_METADATA,
-                TableSourceScanTestPrograms.MULTIPLE_PUSHDOWNS,
-                TableSourceScanTestPrograms.SOURCE_WATERMARK,
                 TableSourceScanTestPrograms.REUSE_SOURCE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/LimitTestPrograms.java
similarity index 95%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/LimitTestPrograms.java
index 0fc8a44695f..05a7fbaaed4 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/LimitTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
@@ -40,7 +40,7 @@ public class LimitTestPrograms {
             };
 
     static final Row[] DATA2 = new Row[] {Row.of(8, "d", 3), Row.of(7, "e", 
2)};
-    static final TableTestProgram LIMIT =
+    public static final TableTestProgram LIMIT =
             TableTestProgram.of("limit", "validates limit node")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSourceScanTestPrograms.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSourceScanTestPrograms.java
index d554c7d78c8..13e2816bb85 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSourceScanTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
@@ -38,7 +38,7 @@ public class TableSourceScanTestPrograms {
         Row.of(5, 2L, "foo bar", 
DateTimeUtils.toLocalDateTime(1586937615000L)),
     };
 
-    static final TableTestProgram PROJECT_PUSHDOWN =
+    public static final TableTestProgram PROJECT_PUSHDOWN =
             TableTestProgram.of(
                             "table-source-scan-project-pushdown",
                             "validates table source scan with project 
pushdown")
@@ -58,7 +58,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, c FROM source_t")
                     .build();
 
-    static final TableTestProgram PROJECT_PUSHDOWN_DISABLED =
+    public static final TableTestProgram PROJECT_PUSHDOWN_DISABLED =
             TableTestProgram.of(
                             "table-source-scan-project-push-down-disabled",
                             "validates table source scan with project pushdown 
disabled")
@@ -84,7 +84,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, c FROM source_t")
                     .build();
 
-    static final TableTestProgram FILTER_PUSHDOWN =
+    public static final TableTestProgram FILTER_PUSHDOWN =
             TableTestProgram.of(
                             "table-source-scan-filter-pushdown",
                             "validates table source scan with filter pushdown")
@@ -105,7 +105,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT * FROM source_t WHERE a 
> 1")
                     .build();
 
-    static final TableTestProgram LIMIT_PUSHDOWN =
+    public static final TableTestProgram LIMIT_PUSHDOWN =
             TableTestProgram.of(
                             "table-source-scan-limit-pushdown",
                             "validates table source scan with limit pushdown")
@@ -124,7 +124,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
LIMIT 2")
                     .build();
 
-    static final TableTestProgram PARTITION_PUSHDOWN =
+    public static final TableTestProgram PARTITION_PUSHDOWN =
             TableTestProgram.of(
                             "table-source-scan-partition-pushdown",
                             "validates table source scan with partition 
pushdown")
@@ -146,7 +146,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b = 2")
                     .build();
 
-    static final TableTestProgram READING_METADATA =
+    public static final TableTestProgram READING_METADATA =
             TableTestProgram.of(
                             "table-source-scan-reading-metadata",
                             "validates table source scan by reading metadata")
@@ -175,7 +175,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, c, d FROM source_t")
                     .build();
 
-    static final TableTestProgram MULTIPLE_PUSHDOWNS =
+    public static final TableTestProgram MULTIPLE_PUSHDOWNS =
             TableTestProgram.of(
                             "table-source-scan-multiple-pushdowns",
                             "validates table source scan with multiple 
pushdowns")
@@ -204,7 +204,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a FROM source_t WHERE b 
= 2 AND a > 2")
                     .build();
 
-    static final TableTestProgram SOURCE_WATERMARK =
+    public static final TableTestProgram SOURCE_WATERMARK =
             TableTestProgram.of(
                             "table-source-scan-source-watermark",
                             "validates table source scan using source 
watermark")
@@ -232,7 +232,7 @@ public class TableSourceScanTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, c FROM source_t")
                     .build();
 
-    static final TableTestProgram REUSE_SOURCE =
+    public static final TableTestProgram REUSE_SOURCE =
             TableTestProgram.of(
                             "table-source-scan-reuse-source",
                             "validates table source scan by verifying if 
source is resused")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
index 9d6180064ff..a4e744beda1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.planner.plan.nodes.exec.common.LimitTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanRestoreTest.java
index 690d98c3ea7..ccc5133cabb 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceScanRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.TableSourceScanTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-limit_1/limit/plan/limit.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-limit_1/limit/plan/limit.json
new file mode 100644
index 00000000000..deeebce5381
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-limit_1/limit/plan/limit.json
@@ -0,0 +1,184 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "LimitPushDown",
+        "limit" : 3
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, limit=[3]]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "457b6567-bbde-488f-863a-2a231beeb531",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-limit_1",
+    "limitStart" : 0,
+    "limitEnd" : 3,
+    "isGlobal" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Limit(offset=[0], fetch=[3], global=[false])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Exchange(distribution=[single])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-limit_1",
+    "limitStart" : 0,
+    "limitEnd" : 3,
+    "isGlobal" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Limit(offset=[0], fetch=[3], global=[true])"
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-filter-pushdown/plan/table-source-scan-filter-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-filter-pushdown/plan/table-source-scan-filter-pushdown.json
new file mode 100644
index 00000000000..40a014fdcfb
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-filter-pushdown/plan/table-source-scan-filter-pushdown.json
@@ -0,0 +1,95 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 6,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 0,
+            "type" : "INT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 1,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ]
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[>(a, 1)]]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "d70a4078-d45f-4fa7-a7f7-29078d21ff76",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 7,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-limit-pushdown/plan/table-source-scan-limit-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-limit-pushdown/plan/table-source-scan-limit-pushdown.json
new file mode 100644
index 00000000000..acf2d91abbe
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-limit-pushdown/plan/table-source-scan-limit-pushdown.json
@@ -0,0 +1,145 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 8,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "LimitPushDown",
+        "limit" : 2
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, limit=[2]]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "ba9688f3-d389-463c-8ce2-e3a9a8afc107",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-limit_1",
+    "limitStart" : 0,
+    "limitEnd" : 2,
+    "isGlobal" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Limit(offset=[0], fetch=[2], global=[false])"
+  }, {
+    "id" : 10,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[single])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 11,
+    "type" : "batch-exec-limit_1",
+    "limitStart" : 0,
+    "limitEnd" : 2,
+    "isGlobal" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Limit(offset=[0], fetch=[2], global=[true])"
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-partition-pushdown/plan/table-source-scan-partition-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-partition-pushdown/plan/table-source-scan-partition-pushdown.json
new file mode 100644
index 00000000000..2d4b3a4e012
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-partition-pushdown/plan/table-source-scan-partition-pushdown.json
@@ -0,0 +1,130 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 13,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ "b" ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "PartitionPushDown",
+        "partitions" : [ {
+          "b" : "2"
+        } ]
+      }, {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, partitions=[{b=2}], project=[a, c], metadata=[]]], 
fields=[a, c])",
+    "dynamicFilteringDataListenerID" : "f569c394-0ead-4d5c-95af-e86797bf821f",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 14,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "LITERAL",
+        "value" : 2,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, CAST(2 AS BIGINT) AS b, c])"
+  }, {
+    "id" : 15,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-project-push-down-disabled/plan/table-source-scan-project-push-down-disabled.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-project-push-down-disabled/plan/table-source-scan-project-push-down-disabled.json
new file mode 100644
index 00000000000..c2c1b334b91
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-project-push-down-disabled/plan/table-source-scan-project-push-down-disabled.json
@@ -0,0 +1,113 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 3,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "ts",
+              "kind" : "METADATA",
+              "dataType" : "TIMESTAMP(3)",
+              "isVirtual" : false
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ "ts" ],
+        "producedType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), 
`ts` TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `ts` 
TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, metadata=[ts]]], fields=[a, b, c, ts])",
+    "dynamicFilteringDataListenerID" : "75a35130-8c3e-4b95-a5d0-6d52432992a6",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, c])"
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, c])"
+  } ],
+  "edges" : [ {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-project-pushdown/plan/table-source-scan-project-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-project-pushdown/plan/table-source-scan-project-pushdown.json
new file mode 100644
index 00000000000..2f3114c7f09
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-project-pushdown/plan/table-source-scan-project-pushdown.json
@@ -0,0 +1,83 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[a, c], metadata=[]]], fields=[a, c])",
+    "dynamicFilteringDataListenerID" : "1d526905-ffe3-47ad-9a51-a9ccd4db8d4c",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-reading-metadata/plan/table-source-scan-reading-metadata.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-reading-metadata/plan/table-source-scan-reading-metadata.json
new file mode 100644
index 00000000000..27dfb48fc00
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-reading-metadata/plan/table-source-scan-reading-metadata.json
@@ -0,0 +1,91 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 16,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "kind" : "METADATA",
+              "dataType" : "TIMESTAMP(3)",
+              "isVirtual" : false
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ "d" ],
+        "producedType" : "ROW<`a` INT, `c` VARCHAR(2147483647), `d` 
TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[a, c], metadata=[d]]], fields=[a, c, d])",
+    "dynamicFilteringDataListenerID" : "7af31e86-1964-4d6d-955a-1d4a9f63d947",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 17,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, c, d])"
+  } ],
+  "edges" : [ {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-reuse-source/plan/table-source-scan-reuse-source.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-reuse-source/plan/table-source-scan-reuse-source.json
new file mode 100644
index 00000000000..d8ec45174fd
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-table-source-scan_1/table-source-scan-reuse-source/plan/table-source-scan-reuse-source.json
@@ -0,0 +1,176 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 18,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `d` 
TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c, d])",
+    "dynamicFilteringDataListenerID" : "42816611-3e2a-4ca2-bb3f-ce7d5925316d",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 19,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, c])"
+  }, {
+    "id" : 20,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_one_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_one_t], 
fields=[a, c])"
+  }, {
+    "id" : 21,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT>",
+    "description" : "Calc(select=[a, b])"
+  }, {
+    "id" : 22,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_two_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_two_t], 
fields=[a, b])"
+  } ],
+  "edges" : [ {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to