This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0050e426bc0 [FLINK-35799][table] Add CompiledPlan annotations to 
BatchExecCalc
0050e426bc0 is described below

commit 0050e426bc0640858c539d43032a4226eea479e5
Author: James Hughes <[email protected]>
AuthorDate: Mon Jul 15 10:41:15 2024 -0400

    [FLINK-35799][table] Add CompiledPlan annotations to BatchExecCalc
---
 .../flink/table/test/program/SinkTestStep.java     |   2 +-
 .../planner/plan/ExecNodeGraphInternalPlan.java    |   6 +-
 .../plan/nodes/exec/batch/BatchExecCalc.java       |  34 +++
 .../plan/nodes/exec/batch/BatchExecSink.java       |  40 +++
 .../nodes/exec/batch/BatchExecTableSourceScan.java |  30 +++
 .../plan/nodes/exec/common/CommonExecSink.java     |   3 +-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |  12 +-
 .../table/planner/delegation/BatchPlanner.scala    |  15 +-
 .../table/planner/delegation/PlannerBase.scala     |  59 ++++-
 .../table/planner/delegation/StreamPlanner.scala   |  53 ----
 .../apache/flink/table/api/CompiledPlanITCase.java |  23 +-
 .../CalcBatchRestoreTest.java}                     |  13 +-
 .../exec/{stream => common}/CalcTestPrograms.java  |  16 +-
 .../plan/nodes/exec/stream/CalcRestoreTest.java    |   1 +
 .../planner/plan/nodes/exec/stream/MiscTests.java  |   2 +-
 ...toreTestBase.java => BatchRestoreTestBase.java} | 190 +++------------
 .../plan/nodes/exec/testutils/RestoreTestBase.java |   2 +-
 .../plan/calc-filter-pushdown.json                 |  89 +++++++
 .../calc-filter/plan/calc-filter.json              | 138 +++++++++++
 .../plan/calc-project-pushdown.json                | 132 ++++++++++
 .../calc-sarg/plan/calc-sarg.json                  | 129 ++++++++++
 .../calc-simple/plan/calc-simple.json              | 110 +++++++++
 .../calc-udf-complex/plan/calc-udf-complex.json    | 269 +++++++++++++++++++++
 .../calc-udf-simple/plan/calc-udf-simple.json      | 108 +++++++++
 24 files changed, 1212 insertions(+), 264 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
index 5d57e6095b5..f708adaeab3 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
@@ -110,7 +110,7 @@ public final class SinkTestStep extends TableTestStep {
                         : TestKind.SINK_WITH_RESTORE_DATA;
     }
 
-    public boolean getTestChangelogData() {
+    public boolean shouldTestChangelogData() {
         return testChangelogData;
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
index b921535b8b8..59e587b3f27 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.delegation.InternalPlan;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
 
 import java.io.File;
 import java.io.IOException;
@@ -102,10 +102,10 @@ public class ExecNodeGraphInternalPlan implements 
InternalPlan {
     @Override
     public List<String> getSinkIdentifiers() {
         return this.execNodeGraph.getRootNodes().stream()
-                .filter(execNode -> execNode instanceof StreamExecSink)
+                .filter(execNode -> execNode instanceof CommonExecSink)
                 .map(
                         execNode ->
-                                ((StreamExecSink) execNode)
+                                ((CommonExecSink) execNode)
                                         .getTableSinkSpec()
                                         .getContextResolvedTable()
                                         .getIdentifier())
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
index 46ec2f0faef..ab9dbca87f1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
@@ -28,12 +29,16 @@ import 
org.apache.flink.table.planner.plan.fusion.spec.CalcFusionCodegenSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexNode;
 
 import javax.annotation.Nullable;
@@ -43,6 +48,12 @@ import java.util.List;
 import java.util.Optional;
 
 /** Batch {@link ExecNode} for Calc. */
+@ExecNodeMetadata(
+        name = "batch-exec-calc",
+        version = 1,
+        producedTransformations = CommonExecCalc.CALC_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecCalc extends CommonExecCalc implements 
BatchExecNode<RowData> {
 
     public BatchExecCalc(
@@ -65,6 +76,29 @@ public class BatchExecCalc extends CommonExecCalc implements 
BatchExecNode<RowDa
                 description);
     }
 
+    @JsonCreator
+    public BatchExecCalc(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_PROJECTION) List<RexNode> projection,
+            @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(
+                id,
+                context,
+                persistedConfig,
+                projection,
+                condition,
+                TableStreamOperator.class,
+                false, // retainHeader
+                inputProperties,
+                outputType,
+                description);
+    }
+
     public boolean supportFusionCodegen() {
         return true;
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index 9a28b1ccf7f..c9a6e8c3c63 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.Column;
@@ -33,12 +34,16 @@ import 
org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -46,6 +51,20 @@ import java.util.List;
 /**
  * Batch {@link ExecNode} to write data into an external sink defined by a 
{@link DynamicTableSink}.
  */
+@ExecNodeMetadata(
+        name = "batch-exec-sink",
+        version = 1,
+        consumedOptions = {
+            "table.exec.sink.not-null-enforcer",
+            "table.exec.sink.type-length-enforcer",
+        },
+        producedTransformations = {
+            CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
+            CommonExecSink.PARTITIONER_TRANSFORMATION,
+            CommonExecSink.SINK_TRANSFORMATION
+        },
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Object> {
     public BatchExecSink(
             ReadableConfig tableConfig,
@@ -65,6 +84,27 @@ public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Objec
                 description);
     }
 
+    @JsonCreator
+    public BatchExecSink(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_DYNAMIC_TABLE_SINK) DynamicTableSinkSpec 
tableSinkSpec,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) LogicalType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(
+                id,
+                context,
+                persistedConfig,
+                tableSinkSpec,
+                ChangelogMode.insertOnly(),
+                true, // isBounded
+                inputProperties,
+                outputType,
+                description);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected Transformation<Object> translateToPlanInternal(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
index e982477f7ab..f11433ff62c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
@@ -29,6 +30,7 @@ import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
@@ -37,6 +39,9 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Collections;
 import java.util.UUID;
 
@@ -44,6 +49,12 @@ import java.util.UUID;
  * Batch {@link ExecNode} to read data from an external source defined by a 
bounded {@link
  * ScanTableSource}.
  */
+@ExecNodeMetadata(
+        name = "batch-exec-table-source-scan",
+        version = 1,
+        producedTransformations = 
CommonExecTableSourceScan.SOURCE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecTableSourceScan extends CommonExecTableSourceScan
         implements BatchExecNode<RowData> {
 
@@ -86,6 +97,25 @@ public class BatchExecTableSourceScan extends 
CommonExecTableSourceScan
         this.tableConfig = tableConfig;
     }
 
+    @JsonCreator
+    public BatchExecTableSourceScan(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig tableConfig,
+            @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec 
tableSourceSpec,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(
+                id,
+                context,
+                tableConfig,
+                tableSourceSpec,
+                Collections.emptyList(),
+                outputType,
+                description);
+        this.tableConfig = tableConfig;
+    }
+
     public String getDynamicFilteringDataListenerID() {
         return dynamicFilteringDataListenerID;
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index e6179dbaa58..e58de91048e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -66,7 +66,6 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import 
org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
@@ -568,7 +567,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
 
     private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
-            if (this instanceof StreamExecNode && config.shouldSetUid()) {
+            if (config.shouldSetUid()) {
                 return Optional.of(createTransformationUid(name, config));
             }
             return Optional.empty();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 595cded6c08..99b4db795a2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -28,6 +28,9 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
@@ -148,6 +151,10 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecPythonGroupAggregate.class);
                     add(StreamExecPythonGroupWindowAggregate.class);
                     add(StreamExecPythonOverAggregate.class);
+                    // Batch execution mode
+                    add(BatchExecSink.class);
+                    add(BatchExecTableSourceScan.class);
+                    add(BatchExecCalc.class);
                 }
             };
 
@@ -198,8 +205,9 @@ public final class ExecNodeMetadataUtil {
     }
 
     public static <T extends ExecNode<?>> boolean isUnsupported(Class<T> 
execNode) {
-        return !StreamExecNode.class.isAssignableFrom(execNode)
-                || UNSUPPORTED_JSON_SERDE_CLASSES.contains(execNode);
+        boolean streamOrKnownExecNode =
+                StreamExecNode.class.isAssignableFrom(execNode) || 
execNodes().contains(execNode);
+        return !streamOrKnownExecNode || 
UNSUPPORTED_JSON_SERDE_CLASSES.contains(execNode);
     }
 
     public static void addTestNode(Class<? extends ExecNode<?>> execNodeClass) 
{
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index bb4c1b75a28..120c5f70b6f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.delegation
 import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ExecutionOptions
-import org.apache.flink.table.api.{ExplainDetail, ExplainFormat, 
PlanReference, TableConfig, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.delegation.{Executor, InternalPlan}
@@ -163,19 +163,6 @@ class BatchPlanner(
       classLoader)
   }
 
-  override def loadPlan(planReference: PlanReference): InternalPlan = {
-    throw new UnsupportedOperationException(
-      "The compiled plan feature is not supported in batch mode.")
-  }
-
-  override def compilePlan(modifyOperations: util.List[ModifyOperation]): 
InternalPlan =
-    throw new UnsupportedOperationException(
-      "The compiled plan feature is not supported in batch mode.")
-
-  override def translatePlan(plan: InternalPlan): util.List[Transformation[_]] 
=
-    throw new UnsupportedOperationException(
-      "The compiled plan feature is not supported in batch mode.")
-
   override def explainPlan(plan: InternalPlan, extraDetails: ExplainDetail*): 
String =
     throw new UnsupportedOperationException(
       "The compiled plan feature is not supported in batch mode.")
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 45788e6278e..55ef54a5bcd 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -19,14 +19,16 @@ package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.dag.Transformation
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.graph.StreamGraph
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.PlanReference.{ContentPlanReference, 
FilePlanReference, ResourcePlanReference}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
 import org.apache.flink.table.connector.sink.DynamicTableSink
-import org.apache.flink.table.delegation.{Executor, Parser, ParserFactory, 
Planner}
+import org.apache.flink.table.delegation._
 import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, 
TableFactoryUtil}
 import org.apache.flink.table.module.{Module, ModuleManager}
 import org.apache.flink.table.operations._
@@ -38,10 +40,11 @@ import 
org.apache.flink.table.planner.connectors.DynamicSinkUtils
 import 
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
+import org.apache.flink.table.planner.plan.ExecNodeGraphInternalPlan
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNodeGraph, 
ExecNodeGraphGenerator}
 import 
org.apache.flink.table.planner.plan.nodes.exec.processor.{ExecNodeGraphProcessor,
 ProcessorContext}
-import org.apache.flink.table.planner.plan.nodes.exec.serde.SerdeContext
+import org.apache.flink.table.planner.plan.nodes.exec.serde.{JsonSerdeUtil, 
SerdeContext}
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
 import org.apache.flink.table.planner.plan.optimize.Optimizer
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
@@ -60,6 +63,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalTableModify
 
+import java.io.{File, IOException}
 import java.lang.{Long => JLong}
 import java.util
 import java.util.{Collections, TimeZone}
@@ -182,6 +186,57 @@ abstract class PlannerBase(
     transformations
   }
 
+  override def loadPlan(planReference: PlanReference): InternalPlan = {
+    val ctx = createSerdeContext
+    val objectReader: ObjectReader = JsonSerdeUtil.createObjectReader(ctx)
+    val execNodeGraph = planReference match {
+      case filePlanReference: FilePlanReference =>
+        objectReader.readValue(filePlanReference.getFile, 
classOf[ExecNodeGraph])
+      case contentPlanReference: ContentPlanReference =>
+        objectReader.readValue(contentPlanReference.getContent, 
classOf[ExecNodeGraph])
+      case resourcePlanReference: ResourcePlanReference =>
+        val url = resourcePlanReference.getClassLoader
+          .getResource(resourcePlanReference.getResourcePath)
+        if (url == null) {
+          throw new IOException("Cannot load the plan reference from 
classpath: " + planReference)
+        }
+        objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph])
+      case _ =>
+        throw new IllegalStateException(
+          "Unknown PlanReference. This is a bug, please contact the 
developers")
+    }
+    compileExecNodeGraphToInternalPlan(ctx, execNodeGraph)
+  }
+
+  override def compilePlan(modifyOperations: util.List[ModifyOperation]): 
InternalPlan = {
+    beforeTranslation()
+    val relNodes = modifyOperations.map(translateToRel)
+    val optimizedRelNodes = optimize(relNodes)
+    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = 
true)
+    afterTranslation()
+    compileExecNodeGraphToInternalPlan(createSerdeContext, execGraph)
+  }
+
+  override def translatePlan(plan: InternalPlan): util.List[Transformation[_]] 
= {
+    beforeTranslation()
+    val execGraph = 
plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
+    val transformations = translateToPlan(execGraph)
+    afterTranslation()
+    transformations
+  }
+
+  private def compileExecNodeGraphToInternalPlan(
+      ctx: SerdeContext,
+      execNodeGraph: ExecNodeGraph) = {
+    new ExecNodeGraphInternalPlan(
+      () =>
+        JsonSerdeUtil
+          .createObjectWriter(ctx)
+          .withDefaultPrettyPrinter()
+          .writeValueAsString(execNodeGraph),
+      execNodeGraph)
+  }
+
   /** Converts a relational tree of [[ModifyOperation]] into a Calcite 
relational expression. */
   @VisibleForTesting
   private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode 
= {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index fb32326f117..f553c62761e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -169,59 +169,6 @@ class StreamPlanner(
       classLoader)
   }
 
-  override def loadPlan(planReference: PlanReference): InternalPlan = {
-    val ctx = createSerdeContext
-    val objectReader: ObjectReader = JsonSerdeUtil.createObjectReader(ctx)
-    val execNodeGraph = planReference match {
-      case filePlanReference: FilePlanReference =>
-        objectReader.readValue(filePlanReference.getFile, 
classOf[ExecNodeGraph])
-      case contentPlanReference: ContentPlanReference =>
-        objectReader.readValue(contentPlanReference.getContent, 
classOf[ExecNodeGraph])
-      case resourcePlanReference: ResourcePlanReference =>
-        val url = resourcePlanReference.getClassLoader
-          .getResource(resourcePlanReference.getResourcePath)
-        if (url == null) {
-          throw new IOException("Cannot load the plan reference from 
classpath: " + planReference)
-        }
-        objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph])
-      case _ =>
-        throw new IllegalStateException(
-          "Unknown PlanReference. This is a bug, please contact the 
developers")
-    }
-
-    new ExecNodeGraphInternalPlan(
-      // ensures that the JSON output is always normalized
-      () =>
-        JsonSerdeUtil
-          .createObjectWriter(ctx)
-          .withDefaultPrettyPrinter()
-          .writeValueAsString(execNodeGraph),
-      execNodeGraph)
-  }
-
-  override def compilePlan(modifyOperations: util.List[ModifyOperation]): 
InternalPlan = {
-    beforeTranslation()
-    val relNodes = modifyOperations.map(translateToRel)
-    val optimizedRelNodes = optimize(relNodes)
-    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = 
true)
-    afterTranslation()
-
-    val compiledJson = JsonSerdeUtil
-      .createObjectWriter(createSerdeContext)
-      .withDefaultPrettyPrinter()
-      .writeValueAsString(execGraph)
-
-    new ExecNodeGraphInternalPlan(() => compiledJson, execGraph)
-  }
-
-  override def translatePlan(plan: InternalPlan): util.List[Transformation[_]] 
= {
-    beforeTranslation()
-    val execGraph = 
plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
-    val transformations = translateToPlan(execGraph)
-    afterTranslation()
-    transformations
-  }
-
   override def explainPlan(plan: InternalPlan, extraDetails: ExplainDetail*): 
String = {
     beforeTranslation()
     val execGraph = 
plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 591f0a52e4a..514a57b9c5d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -404,28 +404,13 @@ class CompiledPlanITCase extends JsonPlanTestBase {
     }
 
     @Test
-    void testBatchMode() {
+    void testCompileAndExecutePlanBatchMode() throws Exception {
         tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
 
-        String srcTableDdl =
-                "CREATE TABLE src (\n"
-                        + "  a bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'true')";
-        tableEnv.executeSql(srcTableDdl);
-
-        String sinkTableDdl =
-                "CREATE TABLE sink (\n"
-                        + "  a bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tableEnv.executeSql(sinkTableDdl);
+        File sinkPath = createSourceSinkTables();
 
-        assertThatThrownBy(() -> tableEnv.compilePlanSql("INSERT INTO sink 
SELECT * FROM src"))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessage("The compiled plan feature is not supported in 
batch mode.");
+        tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM 
src").execute().await();
+        assertResult(DATA, sinkPath);
     }
 
     private File createSourceSinkTables() throws IOException {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcBatchRestoreTest.java
similarity index 80%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcBatchRestoreTest.java
index 49585d85ccb..095b4d411ca 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecCalc}. */
-public class CalcRestoreTest extends RestoreTestBase {
+/** Restore tests for {@link BatchExecCalc}. */
+public class CalcBatchRestoreTest extends BatchRestoreTestBase {
 
-    public CalcRestoreTest() {
-        super(StreamExecCalc.class);
+    public CalcBatchRestoreTest() {
+        super(BatchExecCalc.class);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
index e6606c279e9..0ccbaccc7a0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc1;
@@ -36,7 +36,7 @@ import java.time.LocalDateTime;
  */
 public class CalcTestPrograms {
 
-    static final TableTestProgram SIMPLE_CALC =
+    public static final TableTestProgram SIMPLE_CALC =
             TableTestProgram.of("calc-simple", "validates basic calc node")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
@@ -53,7 +53,7 @@ public class CalcTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
                     .build();
 
-    static final TableTestProgram CALC_PROJECT_PUSHDOWN =
+    public static final TableTestProgram CALC_PROJECT_PUSHDOWN =
             TableTestProgram.of(
                             "calc-project-pushdown", "validates calc node with 
project pushdown")
                     .setupTableSource(
@@ -73,7 +73,7 @@ public class CalcTestPrograms {
                             "INSERT INTO sink_t SELECT a, CAST(a AS VARCHAR) 
FROM source_t WHERE a > CAST(1 AS BIGINT)")
                     .build();
 
-    static final TableTestProgram CALC_FILTER =
+    public static final TableTestProgram CALC_FILTER =
             TableTestProgram.of("calc-filter", "validates calc node with 
filter")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
@@ -90,7 +90,7 @@ public class CalcTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT * FROM source_t WHERE b 
> 0")
                     .build();
 
-    static final TableTestProgram CALC_FILTER_PUSHDOWN =
+    public static final TableTestProgram CALC_FILTER_PUSHDOWN =
             TableTestProgram.of("calc-filter-pushdown", "validates calc node 
with filter pushdown")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
@@ -109,7 +109,7 @@ public class CalcTestPrograms {
                             "INSERT INTO sink_t SELECT a, b FROM source_t 
WHERE a > CAST(420 AS BIGINT)")
                     .build();
 
-    static final TableTestProgram CALC_SARG =
+    public static final TableTestProgram CALC_SARG =
             TableTestProgram.of("calc-sarg", "validates calc node with Sarg")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
@@ -128,7 +128,7 @@ public class CalcTestPrograms {
                             "INSERT INTO sink_t SELECT a FROM source_t WHERE a 
= 1 or a = 2 or a is null")
                     .build();
 
-    static final TableTestProgram CALC_UDF_SIMPLE =
+    public static final TableTestProgram CALC_UDF_SIMPLE =
             TableTestProgram.of("calc-udf-simple", "validates calc node with 
simple UDF")
                     .setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
                     .setupTableSource(
@@ -146,7 +146,7 @@ public class CalcTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT a, udf1(a) FROM 
source_t")
                     .build();
 
-    static final TableTestProgram CALC_UDF_COMPLEX =
+    public static final TableTestProgram CALC_UDF_COMPLEX =
             TableTestProgram.of("calc-udf-complex", "validates calc node with 
complex UDFs")
                     .setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
                     .setupTemporaryCatalogFunction("udf2", JavaFunc1.class)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
index 49585d85ccb..80b5fde5e53 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
index 9d894718711..ce918a59b41 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
@@ -107,7 +107,7 @@ class MiscTests implements TableTestProgramRunner {
     }
 
     private static List<String> getExpectedResults(SinkTestStep sinkTestStep, 
String tableName) {
-        if (sinkTestStep.getTestChangelogData()) {
+        if (sinkTestStep.shouldTestChangelogData()) {
             return TestValuesTableFactory.getRawResultsAsStrings(tableName);
         } else {
             return TestValuesTableFactory.getResultsAsStrings(tableName);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
similarity index 53%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
index b409296390f..df5930035ac 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
@@ -18,22 +18,15 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.testutils;
 
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.configuration.StateBackendOptions;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.RestoreMode;
-import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
@@ -45,86 +38,54 @@ import 
org.apache.flink.table.test.program.TestStep.TestKind;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.net.URI;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Base class for implementing restore tests for {@link ExecNode}.You can 
generate json compiled
- * plan and a savepoint for the latest node version by running {@link
- * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled 
by default.
+ * Base class for implementing compiled plan tests for {@link BatchExecNode}. 
You can generate json
+ * compiled plan for the latest node version by running {@link
+ * BatchRestoreTestBase#generateCompiledPlans(TableTestProgram)}. This method 
does not recreate the
+ * compiled plan if it already exists for the given version of the operator.
  *
  * <p><b>Note:</b> The test base uses {@link 
TableConfigOptions.CatalogPlanCompilation#SCHEMA}
- * because it needs to adjust source and sink properties before and after the 
restore. Therefore,
- * the test base can not be used for testing storing table options in the 
compiled plan.
+ * because it needs to adjust source and sink properties. Therefore, the test 
base can not be used
+ * for testing storing table options in the compiled plan.
  */
 @ExtendWith(MiniClusterExtension.class)
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @TestMethodOrder(OrderAnnotation.class)
-public abstract class RestoreTestBase implements TableTestProgramRunner {
+public abstract class BatchRestoreTestBase implements TableTestProgramRunner {
 
     private final Class<? extends ExecNode<?>> execNodeUnderTest;
     private final List<Class<? extends ExecNode<?>>> childExecNodesUnderTest;
-    private final AfterRestoreSource afterRestoreSource;
 
-    protected RestoreTestBase(Class<? extends ExecNode<?>> execNodeUnderTest) {
-        this(execNodeUnderTest, new ArrayList<>(), AfterRestoreSource.FINITE);
+    protected BatchRestoreTestBase(Class<? extends ExecNode<?>> 
execNodeUnderTest) {
+        this(execNodeUnderTest, new ArrayList<>());
     }
 
-    protected RestoreTestBase(
+    protected BatchRestoreTestBase(
             Class<? extends ExecNode<?>> execNodeUnderTest,
             List<Class<? extends ExecNode<?>>> childExecNodesUnderTest) {
-        this(execNodeUnderTest, childExecNodesUnderTest, 
AfterRestoreSource.FINITE);
-    }
-
-    protected RestoreTestBase(
-            Class<? extends ExecNode<?>> execNodeUnderTest, AfterRestoreSource 
state) {
-        this(execNodeUnderTest, new ArrayList<>(), state);
-    }
-
-    protected RestoreTestBase(
-            Class<? extends ExecNode<?>> execNodeUnderTest,
-            List<Class<? extends ExecNode<?>>> childExecNodesUnderTest,
-            AfterRestoreSource state) {
         this.execNodeUnderTest = execNodeUnderTest;
         this.childExecNodesUnderTest = childExecNodesUnderTest;
-        this.afterRestoreSource = state;
-    }
-
-    /**
-     * AfterRestoreSource defines the source behavior while running {@link
-     * RestoreTestBase#testRestore}.
-     */
-    protected enum AfterRestoreSource {
-        FINITE,
-        INFINITE,
-        NO_RESTORE
     }
 
     // Used for testing Restore Test Completeness
@@ -142,7 +103,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         return EnumSet.of(
                 TestKind.CONFIG,
                 TestKind.FUNCTION,
-                TestKind.TEMPORAL_FUNCTION,
                 TestKind.SOURCE_WITH_RESTORE_DATA,
                 TestKind.SOURCE_WITH_DATA,
                 TestKind.SINK_WITH_RESTORE_DATA,
@@ -159,8 +119,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         TestValuesTableFactory.clearAllData();
     }
 
-    private @TempDir Path tmpDir;
-
     private List<ExecNodeMetadata> getAllMetadata() {
         return 
ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest);
     }
@@ -176,41 +134,17 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                                 supportedPrograms().stream().map(p -> 
Arguments.of(p, metadata)));
     }
 
-    private void registerSinkObserver(
-            final List<CompletableFuture<?>> futures,
-            final SinkTestStep sinkTestStep,
-            final boolean ignoreAfter) {
-        final CompletableFuture<Object> future = new CompletableFuture<>();
-        futures.add(future);
-        final String tableName = sinkTestStep.name;
-        TestValuesTableFactory.registerLocalRawResultsObserver(
-                tableName,
-                (integer, strings) -> {
-                    List<String> results =
-                            new 
ArrayList<>(sinkTestStep.getExpectedBeforeRestoreAsStrings());
-                    if (!ignoreAfter) {
-                        
results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings());
-                    }
-                    List<String> expectedResults = 
getExpectedResults(sinkTestStep, tableName);
-                    final boolean shouldComplete =
-                            CollectionUtils.isEqualCollection(expectedResults, 
results);
-                    if (shouldComplete) {
-                        future.complete(null);
-                    }
-                });
-    }
-
-    /**
-     * Execute this test to generate test files. Remember to be using the 
correct branch when
-     * generating the test files.
-     */
-    @Disabled
+    /** Generates compiled plans for a given TableTestProgram. */
     @ParameterizedTest
     @MethodSource("supportedPrograms")
     @Order(0)
-    public void generateTestSetupFiles(TableTestProgram program) throws 
Exception {
-        final EnvironmentSettings settings = 
EnvironmentSettings.inStreamingMode();
-        settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"rocksdb");
+    public void generateCompiledPlans(TableTestProgram program) {
+        Path path = getPlanPath(program, getLatestMetadata());
+        if (path.toFile().exists()) {
+            return;
+        }
+
+        final EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
         final TableEnvironment tEnv = TableEnvironment.create(settings);
         program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
         tEnv.getConfig()
@@ -223,14 +157,13 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("data-id", id);
-            options.put("terminating", "false");
+            options.put("bounded", "true");
+            options.put("terminating", "true");
             options.put("runtime-source", "NewSource");
             sourceTestStep.apply(tEnv, options);
         }
 
-        final List<CompletableFuture<?>> futures = new ArrayList<>();
         for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
-            registerSinkObserver(futures, sinkTestStep, true);
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("sink-insert-only", "false");
@@ -238,7 +171,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         }
 
         program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
-        program.getSetupTemporalFunctionTestSteps().forEach(s -> 
s.apply(tEnv));
 
         final CompiledPlan compiledPlan;
         if (program.runSteps.get(0).getKind() == TestKind.STATEMENT_SET) {
@@ -249,39 +181,15 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
             compiledPlan = tEnv.compilePlanSql(sqlTestStep.sql);
         }
 
-        compiledPlan.writeToFile(getPlanPath(program, getLatestMetadata()));
-
-        final TableResult tableResult = compiledPlan.execute();
-        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).get();
-        final JobClient jobClient = tableResult.getJobClient().get();
-        final String savepoint =
-                jobClient
-                        .stopWithSavepoint(false, tmpDir.toString(), 
SavepointFormatType.DEFAULT)
-                        .get();
-        CommonTestUtils.waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.FINISHED));
-        final Path savepointPath = Paths.get(new URI(savepoint));
-        final Path savepointDirPath = getSavepointPath(program, 
getLatestMetadata());
-        Files.createDirectories(savepointDirPath);
-        Files.move(savepointPath, savepointDirPath, 
StandardCopyOption.ATOMIC_MOVE);
+        compiledPlan.writeToFile(path);
     }
 
     @ParameterizedTest
     @MethodSource("createSpecs")
     @Order(1)
-    void testRestore(TableTestProgram program, ExecNodeMetadata metadata) 
throws Exception {
-        final EnvironmentSettings settings = 
EnvironmentSettings.inStreamingMode();
-        final SavepointRestoreSettings restoreSettings;
-        if (afterRestoreSource == AfterRestoreSource.NO_RESTORE) {
-            restoreSettings = SavepointRestoreSettings.none();
-        } else {
-            restoreSettings =
-                    SavepointRestoreSettings.forPath(
-                            getSavepointPath(program, metadata).toString(),
-                            false,
-                            RestoreMode.NO_CLAIM);
-        }
-        SavepointRestoreSettings.toConfiguration(restoreSettings, 
settings.getConfiguration());
-        settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"rocksdb");
+    void loadAndRunCompiledPlan(TableTestProgram program, ExecNodeMetadata 
metadata)
+            throws Exception {
+        final EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
         final TableEnvironment tEnv = TableEnvironment.create(settings);
         tEnv.getConfig()
                 .set(
@@ -291,27 +199,21 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
 
         for (SourceTestStep sourceTestStep : 
program.getSetupSourceTestSteps()) {
-            final Collection<Row> data =
-                    afterRestoreSource == AfterRestoreSource.NO_RESTORE
-                            ? sourceTestStep.dataBeforeRestore
-                            : sourceTestStep.dataAfterRestore;
+
+            List<Row> data = new ArrayList<>();
+            data.addAll(sourceTestStep.dataBeforeRestore);
+            data.addAll(sourceTestStep.dataAfterRestore);
             final String id = TestValuesTableFactory.registerData(data);
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("data-id", id);
             options.put("runtime-source", "NewSource");
-            if (afterRestoreSource == AfterRestoreSource.INFINITE) {
-                options.put("terminating", "false");
-            }
+            options.put("terminating", "true");
+            options.put("bounded", "true");
             sourceTestStep.apply(tEnv, options);
         }
 
-        final List<CompletableFuture<?>> futures = new ArrayList<>();
-
         for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
-            if (afterRestoreSource == AfterRestoreSource.INFINITE) {
-                registerSinkObserver(futures, sinkTestStep, false);
-            }
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("disable-lookup", "true");
@@ -320,28 +222,16 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         }
 
         program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
-        program.getSetupTemporalFunctionTestSteps().forEach(s -> 
s.apply(tEnv));
 
         final CompiledPlan compiledPlan =
                 tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, 
metadata)));
 
-        if (afterRestoreSource == AfterRestoreSource.INFINITE) {
-            final TableResult tableResult = compiledPlan.execute();
-            CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).get();
-            tableResult.getJobClient().get().cancel().get();
-        } else {
-            compiledPlan.execute().await();
-            for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
-                List<String> expectedResults = 
getExpectedResults(sinkTestStep, sinkTestStep.name);
-                assertThat(expectedResults)
-                        .containsExactlyInAnyOrder(
-                                Stream.concat(
-                                                
sinkTestStep.getExpectedBeforeRestoreAsStrings()
-                                                        .stream(),
-                                                
sinkTestStep.getExpectedAfterRestoreAsStrings()
-                                                        .stream())
-                                        .toArray(String[]::new));
-            }
+        compiledPlan.execute().await();
+        for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+            List<String> expectedResults = getExpectedResults(sinkTestStep, 
sinkTestStep.name);
+            assertThat(expectedResults)
+                    .containsExactlyInAnyOrder(
+                            sinkTestStep.getExpectedAsStrings().toArray(new 
String[0]));
         }
     }
 
@@ -350,10 +240,6 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                 getTestResourceDirectory(program, metadata) + "/plan/" + 
program.id + ".json");
     }
 
-    private Path getSavepointPath(TableTestProgram program, ExecNodeMetadata 
metadata) {
-        return Paths.get(getTestResourceDirectory(program, metadata) + 
"/savepoint/");
-    }
-
     private String getTestResourceDirectory(TableTestProgram program, 
ExecNodeMetadata metadata) {
         return String.format(
                 "%s/src/test/resources/restore-tests/%s_%d/%s",
@@ -361,7 +247,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
     }
 
     private static List<String> getExpectedResults(SinkTestStep sinkTestStep, 
String tableName) {
-        if (sinkTestStep.getTestChangelogData()) {
+        if (sinkTestStep.shouldTestChangelogData()) {
             return TestValuesTableFactory.getRawResultsAsStrings(tableName);
         } else {
             return TestValuesTableFactory.getResultsAsStrings(tableName);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index b409296390f..106516b11b8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -361,7 +361,7 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
     }
 
     private static List<String> getExpectedResults(SinkTestStep sinkTestStep, 
String tableName) {
-        if (sinkTestStep.getTestChangelogData()) {
+        if (sinkTestStep.shouldTestChangelogData()) {
             return TestValuesTableFactory.getRawResultsAsStrings(tableName);
         } else {
             return TestValuesTableFactory.getResultsAsStrings(tableName);
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
new file mode 100644
index 00000000000..6130f3a119d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
@@ -0,0 +1,89 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 14,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 0,
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 420,
+            "type" : "BIGINT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[>(a, 420:BIGINT)]]], fields=[a, b])",
+    "dynamicFilteringDataListenerID" : "adb172e8-15d9-46f8-a27d-3373a9ff478f",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 15,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b])"
+  } ],
+  "edges" : [ {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter/plan/calc-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter/plan/calc-filter.json
new file mode 100644
index 00000000000..0b9ee103a02
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter/plan/calc-filter.json
@@ -0,0 +1,138 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 11,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "d",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+    "dynamicFilteringDataListenerID" : "bde6e1ba-d0c8-4d78-9eed-b3aea8bbbcb1",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 0,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, b, c, d], where=[(b > 0)])"
+  }, {
+    "id" : 13,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "d",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c, d])"
+  } ],
+  "edges" : [ {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
new file mode 100644
index 00000000000..315a799eaa4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
@@ -0,0 +1,132 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 16,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 0,
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 1,
+            "type" : "BIGINT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ]
+      }, {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ] ],
+        "producedType" : "ROW<`a` BIGINT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[>(a, 1:BIGINT)], project=[a], 
metadata=[]]], fields=[a])",
+    "dynamicFilteringDataListenerID" : "681c0bb8-b2bb-438f-9cfd-92c61aeba248",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 17,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS 
EXPR$1])"
+  }, {
+    "id" : 18,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "a1",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-sarg/plan/calc-sarg.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-sarg/plan/calc-sarg.json
new file mode 100644
index 00000000000..3d74515cb53
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-sarg/plan/calc-sarg.json
@@ -0,0 +1,129 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 19,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a])",
+    "dynamicFilteringDataListenerID" : "8e80a89c-3b29-4638-af93-77420a9bc628",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 20,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "INTERNAL",
+      "internalName" : "$SEARCH$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "sarg" : {
+          "ranges" : [ {
+            "lower" : {
+              "value" : 1,
+              "boundType" : "CLOSED"
+            },
+            "upper" : {
+              "value" : 1,
+              "boundType" : "CLOSED"
+            }
+          }, {
+            "lower" : {
+              "value" : 2,
+              "boundType" : "CLOSED"
+            },
+            "upper" : {
+              "value" : 2,
+              "boundType" : "CLOSED"
+            }
+          } ],
+          "nullAs" : "TRUE"
+        },
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT>",
+    "description" : "Calc(select=[a], where=[SEARCH(a, Sarg[1, 2; NULL AS 
TRUE])])"
+  }, {
+    "id" : 21,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a])"
+  } ],
+  "edges" : [ {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-simple/plan/calc-simple.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-simple/plan/calc-simple.json
new file mode 100644
index 00000000000..e1e9b0a4ce4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-simple/plan/calc-simple.json
@@ -0,0 +1,110 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 8,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])",
+    "dynamicFilteringDataListenerID" : "7d70c7cb-2ee4-460e-9a9f-2385a1fd8526",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "DOUBLE"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT, `b` DOUBLE>",
+    "description" : "Calc(select=[(a + 1) AS EXPR$0, b])"
+  }, {
+    "id" : 10,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "DOUBLE"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` BIGINT, `b` DOUBLE>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[EXPR$0, b])"
+  } ],
+  "edges" : [ {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
new file mode 100644
index 00000000000..46983e54a54
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
@@ -0,0 +1,269 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 25,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+    "dynamicFilteringDataListenerID" : "89c634cd-b623-4893-b177-131aa84da4ba",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 26,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf2`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "systemName" : "udf3",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "systemName" : "udf4",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$SUBSTRING$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "VARCHAR(2147483647)"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 1,
+          "type" : "INT NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 5,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf5`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1000,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$AND$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$OR$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "CALL",
+            "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 0,
+              "type" : "BIGINT"
+            } ],
+            "type" : "BIGINT NOT NULL"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$<$1",
+          "operands" : [ {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$*$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 0,
+              "type" : "BIGINT"
+            }, {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 1,
+              "type" : "INT NOT NULL"
+            } ],
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 100,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$>$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "INT NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 10,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT 
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2` 
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+    "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS a1, b, 
udf2(b, b, d) AS b1, udf3(c, b) AS c1, udf4(SUBSTRING(c, 1, 5)) AS c2, udf5(d, 
1000) AS d1], where=[(((udf1(a) > 0) OR ((a * b) < 100)) AND (b > 10))])"
+  }, {
+    "id" : 27,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "a1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c2",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d1",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT 
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2` 
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, a1, b, b1, c1, c2, d1])"
+  } ],
+  "edges" : [ {
+    "source" : 25,
+    "target" : 26,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 26,
+    "target" : 27,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
new file mode 100644
index 00000000000..b1478457069
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
@@ -0,0 +1,108 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 22,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a])",
+    "dynamicFilteringDataListenerID" : "188277cb-2bd5-4084-85e8-60165c0fff02",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 23,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CAST$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        } ],
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Calc(select=[a, udf1(CAST(a AS BIGINT)) AS EXPR$1])"
+  }, {
+    "id" : 24,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "a1",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 23,
+    "target" : 24,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file


Reply via email to