This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a885f4ffd08 [FLINK-35943][table-planner] Add CompiledPlan annotations 
to BatchExecHashJoin and BatchExecNestedLoopJoin
a885f4ffd08 is described below

commit a885f4ffd08a7b138bb7d643723038c8558b2e07
Author: James Hughes <[email protected]>
AuthorDate: Thu Aug 8 03:43:58 2024 -0400

    [FLINK-35943][table-planner] Add CompiledPlan annotations to 
BatchExecHashJoin and BatchExecNestedLoopJoin
---
 .../plan/nodes/exec/batch/BatchExecHashJoin.java   |  80 ++++-
 .../nodes/exec/batch/BatchExecNestedLoopJoin.java  |  50 ++-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   4 +
 .../JoinBatchRestoreTest.java}                     |  30 +-
 .../exec/{stream => common}/JoinTestPrograms.java  |  30 +-
 .../plan/nodes/exec/stream/JoinRestoreTest.java    |   1 +
 .../plan/join-inner-join-with-duplicate-key.json   | 292 +++++++++++++++++
 .../plan/join-inner-join-with-non-equi-join.json   | 254 +++++++++++++++
 .../join-left-join/plan/join-left-join.json        | 249 ++++++++++++++
 .../join-non-window-inner-join-with-null-cond.json | 361 +++++++++++++++++++++
 .../plan/join-non-window-inner-join.json           | 361 +++++++++++++++++++++
 .../join-outer-join/plan/join-outer-join.json      | 249 ++++++++++++++
 .../join-right-join/plan/join-right-join.json      | 249 ++++++++++++++
 .../join-semi-join/plan/join-semi-join.json        | 242 ++++++++++++++
 .../join-with-filter/plan/join-with-filter.json    | 312 ++++++++++++++++++
 15 files changed, 2732 insertions(+), 32 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
index 7a82a281997..8543072511c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -35,6 +36,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
@@ -51,20 +53,65 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Arrays;
+import java.util.List;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** {@link BatchExecNode} for Hash Join. */
+@ExecNodeMetadata(
+        name = "batch-exec-join",
+        version = 1,
+        producedTransformations = BatchExecHashJoin.JOIN_TRANSFORMATION,
+        consumedOptions = {
+            "table.exec.resource.hash-join.memory",
+            "table.exec.resource.external-buffer-memory",
+            "table.exec.resource.sort.memory",
+            "table.exec.spill-compression.enabled",
+            "table.exec.spill-compression.block-size"
+        },
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecHashJoin extends ExecNodeBase<RowData>
         implements BatchExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
+    public static final String JOIN_TRANSFORMATION = "join";
+    public static final String FIELD_NAME_JOIN_SPEC = "joinSpec";
+    public static final String FIELD_NAME_IS_BROADCAST = "isBroadcast";
+    public static final String FIELD_NAME_LEFT_IS_BUILD = "leftIsBuild";
+    public static final String FIELD_NAME_ESTIMATED_LEFT_AVG_ROW_SIZE = 
"estimatedLeftAvgRowSize";
+    public static final String FIELD_NAME_ESTIMATED_RIGHT_AVG_ROW_SIZE = 
"estimatedRightAvgRowSize";
+    public static final String FIELD_NAME_ESTIMATED_LEFT_ROW_COUNT = 
"estimatedLeftRowCount";
+    public static final String FIELD_NAME_ESTIMATED_RIGHT_ROW_COUNT = 
"estimatedRightRowCount";
+    public static final String FIELD_NAME_TRY_DISTINCT_BUILD_ROW = 
"tryDistinctBuildRow";
+
+    @JsonProperty(FIELD_NAME_JOIN_SPEC)
     private final JoinSpec joinSpec;
+
+    @JsonProperty(FIELD_NAME_IS_BROADCAST)
     private final boolean isBroadcast;
+
+    @JsonProperty(FIELD_NAME_LEFT_IS_BUILD)
     private final boolean leftIsBuild;
+
+    @JsonProperty(FIELD_NAME_ESTIMATED_LEFT_AVG_ROW_SIZE)
     private final int estimatedLeftAvgRowSize;
+
+    @JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_AVG_ROW_SIZE)
     private final int estimatedRightAvgRowSize;
+
+    @JsonProperty(FIELD_NAME_ESTIMATED_LEFT_ROW_COUNT)
     private final long estimatedLeftRowCount;
+
+    @JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_ROW_COUNT)
     private final long estimatedRightRowCount;
+
+    @JsonProperty(FIELD_NAME_TRY_DISTINCT_BUILD_ROW)
     private final boolean tryDistinctBuildRow;
 
     public BatchExecHashJoin(
@@ -88,7 +135,35 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
                 Arrays.asList(leftInputProperty, rightInputProperty),
                 outputType,
                 description);
-        this.joinSpec = joinSpec;
+        this.joinSpec = checkNotNull(joinSpec);
+        this.isBroadcast = isBroadcast;
+        this.leftIsBuild = leftIsBuild;
+        this.estimatedLeftAvgRowSize = estimatedLeftAvgRowSize;
+        this.estimatedRightAvgRowSize = estimatedRightAvgRowSize;
+        this.estimatedLeftRowCount = estimatedLeftRowCount;
+        this.estimatedRightRowCount = estimatedRightRowCount;
+        this.tryDistinctBuildRow = tryDistinctBuildRow;
+    }
+
+    @JsonCreator
+    public BatchExecHashJoin(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
+            @JsonProperty(FIELD_NAME_ESTIMATED_LEFT_AVG_ROW_SIZE) int 
estimatedLeftAvgRowSize,
+            @JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_AVG_ROW_SIZE) int 
estimatedRightAvgRowSize,
+            @JsonProperty(FIELD_NAME_ESTIMATED_LEFT_ROW_COUNT) long 
estimatedLeftRowCount,
+            @JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_ROW_COUNT) long 
estimatedRightRowCount,
+            @JsonProperty(FIELD_NAME_IS_BROADCAST) boolean isBroadcast,
+            @JsonProperty(FIELD_NAME_LEFT_IS_BUILD) boolean leftIsBuild,
+            @JsonProperty(FIELD_NAME_TRY_DISTINCT_BUILD_ROW) boolean 
tryDistinctBuildRow,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        checkArgument(inputProperties.size() == 2);
+        this.joinSpec = checkNotNull(joinSpec);
         this.isBroadcast = isBroadcast;
         this.leftIsBuild = leftIsBuild;
         this.estimatedLeftAvgRowSize = estimatedLeftAvgRowSize;
@@ -268,8 +343,7 @@ public class BatchExecHashJoin extends ExecNodeBase<RowData>
         return ExecNodeUtil.createTwoInputTransformation(
                 buildTransform,
                 probeTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                
createTransformationMeta(BatchExecHashJoin.JOIN_TRANSFORMATION, config),
                 operator,
                 InternalTypeInfo.of(getOutputType()),
                 probeTransform.getParallelism(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
index 35b7f194c94..2ba754ea725 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -29,6 +30,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -37,19 +39,44 @@ import 
org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexNode;
 
 import java.util.Arrays;
+import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** {@link BatchExecNode} for Nested-loop Join. */
+@ExecNodeMetadata(
+        name = "batch-exec-nested-loop-join",
+        version = 1,
+        producedTransformations = BatchExecNestedLoopJoin.JOIN_TRANSFORMATION,
+        consumedOptions = {"table.exec.resource.external-buffer-memory"},
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecNestedLoopJoin extends ExecNodeBase<RowData>
         implements BatchExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
+    public static final String JOIN_TRANSFORMATION = "nested-loop-join";
+    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+    public static final String FIELD_NAME_LEFT_IS_BUILD = "leftIsBuild";
+    public static final String FIELD_NAME_CONDITION = "condition";
+    public static final String FIELD_NAME_SINGLE_ROW_JOIN = "singleRowJoin";
+
+    @JsonProperty(FIELD_NAME_JOIN_TYPE)
     private final FlinkJoinType joinType;
+
+    @JsonProperty(FIELD_NAME_CONDITION)
     private final RexNode condition;
+
+    @JsonProperty(FIELD_NAME_LEFT_IS_BUILD)
     private final boolean leftIsBuild;
+
+    @JsonProperty(FIELD_NAME_SINGLE_ROW_JOIN)
     private final boolean singleRowJoin;
 
     public BatchExecNestedLoopJoin(
@@ -75,6 +102,26 @@ public class BatchExecNestedLoopJoin extends 
ExecNodeBase<RowData>
         this.singleRowJoin = singleRowJoin;
     }
 
+    @JsonCreator
+    public BatchExecNestedLoopJoin(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType,
+            @JsonProperty(FIELD_NAME_CONDITION) RexNode condition,
+            @JsonProperty(FIELD_NAME_LEFT_IS_BUILD) boolean leftIsBuild,
+            @JsonProperty(FIELD_NAME_SINGLE_ROW_JOIN) boolean singleRowJoin,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        checkArgument(inputProperties.size() == 2);
+        this.joinType = checkNotNull(joinType);
+        this.condition = checkNotNull(condition);
+        this.leftIsBuild = leftIsBuild;
+        this.singleRowJoin = singleRowJoin;
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     protected Transformation<RowData> translateToPlanInternal(
@@ -118,8 +165,7 @@ public class BatchExecNestedLoopJoin extends 
ExecNodeBase<RowData>
         return ExecNodeUtil.createTwoInputTransformation(
                 leftInputTransform,
                 rightInputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(JOIN_TRANSFORMATION, config),
                 operator,
                 InternalTypeInfo.of(getOutputType()),
                 parallelism,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 9f2ec090968..ad57d0f49c8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -31,6 +31,8 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashJoin;
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJoin;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
@@ -163,6 +165,8 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecSort.class);
                     add(BatchExecValues.class);
                     add(BatchExecCorrelate.class);
+                    add(BatchExecHashJoin.class);
+                    add(BatchExecNestedLoopJoin.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/JoinBatchRestoreTest.java
similarity index 62%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/JoinBatchRestoreTest.java
index ef84b978e29..0058bf87512 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/JoinBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.planner.plan.nodes.exec.common.JoinTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecJoin}. */
-public class JoinRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecHashJoin}. */
+public class JoinBatchRestoreTest extends BatchRestoreTestBase {
 
-    public JoinRestoreTest() {
-        super(StreamExecJoin.class);
+    public JoinBatchRestoreTest() {
+        super(BatchExecHashJoin.class);
     }
 
     @Override
@@ -36,17 +37,22 @@ public class JoinRestoreTest extends RestoreTestBase {
         return Arrays.asList(
                 JoinTestPrograms.NON_WINDOW_INNER_JOIN,
                 JoinTestPrograms.NON_WINDOW_INNER_JOIN_WITH_NULL,
-                JoinTestPrograms.CROSS_JOIN,
+                // Requires BatchExecMultipleInputNode
+                // JoinTestPrograms.CROSS_JOIN,
                 JoinTestPrograms.JOIN_WITH_FILTER,
                 JoinTestPrograms.INNER_JOIN_WITH_DUPLICATE_KEY,
                 JoinTestPrograms.INNER_JOIN_WITH_NON_EQUI_JOIN,
-                JoinTestPrograms.INNER_JOIN_WITH_EQUAL_PK,
-                JoinTestPrograms.INNER_JOIN_WITH_PK,
+                // Requires BatchExecMultipleInputNode
+                // JoinTestPrograms.INNER_JOIN_WITH_EQUAL_PK,
+                // Requires another Join operator
+                // JoinTestPrograms.INNER_JOIN_WITH_PK,
                 JoinTestPrograms.FULL_OUTER,
                 JoinTestPrograms.LEFT_JOIN,
                 JoinTestPrograms.RIGHT_JOIN,
-                JoinTestPrograms.SEMI_JOIN,
-                JoinTestPrograms.ANTI_JOIN,
-                JoinTestPrograms.JOIN_WITH_STATE_TTL_HINT);
+                JoinTestPrograms.SEMI_JOIN
+                // Requires BatchExecMultipleInputNode
+                // JoinTestPrograms.ANTI_JOIN,
+                // JoinTestPrograms.JOIN_WITH_STATE_TTL_HINT
+                );
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
index 4cf4cf346b9..1306b449d44 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
@@ -25,20 +25,20 @@ import org.apache.flink.types.Row;
 
 /** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
 public class JoinTestPrograms {
-    static final TableTestProgram NON_WINDOW_INNER_JOIN;
-    static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL;
-    static final TableTestProgram CROSS_JOIN;
-    static final TableTestProgram JOIN_WITH_FILTER;
-    static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY;
-    static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN;
-    static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK;
-    static final TableTestProgram INNER_JOIN_WITH_PK;
-    static final TableTestProgram LEFT_JOIN;
-    static final TableTestProgram FULL_OUTER;
-    static final TableTestProgram RIGHT_JOIN;
-    static final TableTestProgram SEMI_JOIN;
-    static final TableTestProgram ANTI_JOIN;
-    static final TableTestProgram JOIN_WITH_STATE_TTL_HINT;
+    public static final TableTestProgram NON_WINDOW_INNER_JOIN;
+    public static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL;
+    public static final TableTestProgram CROSS_JOIN;
+    public static final TableTestProgram JOIN_WITH_FILTER;
+    public static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY;
+    public static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN;
+    public static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK;
+    public static final TableTestProgram INNER_JOIN_WITH_PK;
+    public static final TableTestProgram LEFT_JOIN;
+    public static final TableTestProgram FULL_OUTER;
+    public static final TableTestProgram RIGHT_JOIN;
+    public static final TableTestProgram SEMI_JOIN;
+    public static final TableTestProgram ANTI_JOIN;
+    public static final TableTestProgram JOIN_WITH_STATE_TTL_HINT;
 
     static final SourceTestStep EMPLOYEE =
             SourceTestStep.newBuilder("EMPLOYEE")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
index ef84b978e29..5cea793c91f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.planner.plan.nodes.exec.common.JoinTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-inner-join-with-duplicate-key/plan/join-inner-join-with-duplicate-key.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-inner-join-with-duplicate-key/plan/join-inner-join-with-duplicate-key.json
new file mode 100644
index 00000000000..fe1c87a2c63
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-inner-join-with-duplicate-key/plan/join-inner-join-with-duplicate-key.json
@@ -0,0 +1,292 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 27,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`EMPLOYEE`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "salary",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ] ],
+        "producedType" : "ROW<`deptno` INT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`deptno` INT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`deptno` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, EMPLOYEE, project=[deptno], metadata=[]]], fields=[deptno])",
+    "dynamicFilteringDataListenerID" : "a7f20adf-7019-4dd9-bdbe-c409221d57ad",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 28,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT>",
+    "description" : "Exchange(distribution=[hash[deptno]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 29,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`DEPARTMENT`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "department_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "b2",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b3",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`department_num` INT, `b3` INT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`department_num` INT, `b3` INT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`department_num` INT, `b3` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, DEPARTMENT, project=[department_num, b3], metadata=[]]], 
fields=[department_num, b3])",
+    "dynamicFilteringDataListenerID" : "c2902def-9fdc-408a-9a53-51995a016084",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 30,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$=$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`department_num` INT, `b3` INT>",
+    "description" : "Calc(select=[department_num, b3], where=[(b3 = 
department_num)])"
+  }, {
+    "id" : 31,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`department_num` INT, `b3` INT>",
+    "description" : "Exchange(distribution=[hash[b3]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 32,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 1 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "estimatedLeftAvgRowSize" : 4,
+    "estimatedRightAvgRowSize" : 8,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 15000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : false,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `department_num` INT, `b3` INT>",
+    "description" : "HashJoin(joinType=[InnerJoin], where=[(deptno = b3)], 
select=[deptno, department_num, b3], build=[right])"
+  }, {
+    "id" : 33,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `department_num` INT>",
+    "description" : "Calc(select=[deptno, department_num])"
+  }, {
+    "id" : 34,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_num",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `department_num` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[deptno, department_num])"
+  } ],
+  "edges" : [ {
+    "source" : 27,
+    "target" : 28,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 29,
+    "target" : 30,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 30,
+    "target" : 31,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 28,
+    "target" : 32,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 31,
+    "target" : 32,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 32,
+    "target" : 33,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 33,
+    "target" : 34,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-inner-join-with-non-equi-join/plan/join-inner-join-with-non-equi-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-inner-join-with-non-equi-join/plan/join-inner-join-with-non-equi-join.json
new file mode 100644
index 00000000000..4cbca7a61d3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-inner-join-with-non-equi-join/plan/join-inner-join-with-non-equi-join.json
@@ -0,0 +1,254 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 35,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`EMPLOYEE`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "salary",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`deptno` INT, `salary` BIGINT, `name` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, EMPLOYEE]], fields=[deptno, salary, name])",
+    "dynamicFilteringDataListenerID" : "35adc12e-f1f2-492f-9def-bdcf9df01e5d",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 36,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `salary` BIGINT, `name` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[deptno]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 37,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`DEPARTMENT`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "department_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "b2",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b3",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ], [ 3 ] ],
+        "producedType" : "ROW<`department_num` INT, `b2` BIGINT, 
`department_name` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`department_num` INT, `b2` BIGINT, 
`department_name` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`department_num` INT, `b2` BIGINT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, DEPARTMENT, project=[department_num, b2, department_name], 
metadata=[]]], fields=[department_num, b2, department_name])",
+    "dynamicFilteringDataListenerID" : "515898ca-4c7a-4098-bd68-20ed2778268e",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 38,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`department_num` INT, `b2` BIGINT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[department_num]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 39,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$>$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "BIGINT"
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 4,
+          "type" : "BIGINT"
+        } ],
+        "type" : "BOOLEAN"
+      }
+    },
+    "estimatedLeftAvgRowSize" : 24,
+    "estimatedRightAvgRowSize" : 24,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 100000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : true,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    } ],
+    "outputType" : "ROW<`deptno` INT, `salary` BIGINT, `name` 
VARCHAR(2147483647), `department_num` INT, `b2` BIGINT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[InnerJoin], where=[((deptno = 
department_num) AND (salary > b2))], select=[deptno, salary, name, 
department_num, b2, department_name], build=[left])"
+  }, {
+    "id" : 40,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, department_name])"
+  }, {
+    "id" : 41,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[name, department_name])"
+  } ],
+  "edges" : [ {
+    "source" : 35,
+    "target" : 36,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 37,
+    "target" : 38,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 36,
+    "target" : 39,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 38,
+    "target" : 39,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 39,
+    "target" : 40,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 40,
+    "target" : 41,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-left-join/plan/join-left-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-left-join/plan/join-left-join.json
new file mode 100644
index 00000000000..b2a8d69e307
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-left-join/plan/join-left-join.json
@@ -0,0 +1,249 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 62,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`EMPLOYEE`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "salary",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      } ]
+    },
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, EMPLOYEE, project=[deptno, name], metadata=[]]], 
fields=[deptno, name])",
+    "dynamicFilteringDataListenerID" : "b014fc6c-82cd-46a9-be7f-63d5348c7774",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 63,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[deptno]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 64,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`DEPARTMENT`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "department_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "b2",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b3",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 3 ] ],
+        "producedType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, DEPARTMENT, project=[department_num, department_name], 
metadata=[]]], fields=[department_num, department_name])",
+    "dynamicFilteringDataListenerID" : "32c21a3e-eda8-4739-af0d-d2747df0dfdf",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 65,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[department_num]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 66,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "LEFT",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "estimatedLeftAvgRowSize" : 16,
+    "estimatedRightAvgRowSize" : 16,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 100000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : true,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647), 
`department_num` INT, `department_name` VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[LeftOuterJoin], where=[(deptno = 
department_num)], select=[deptno, name, department_num, department_name], 
build=[left])"
+  }, {
+    "id" : 67,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, department_name])"
+  }, {
+    "id" : 68,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[name, department_name])"
+  } ],
+  "edges" : [ {
+    "source" : 62,
+    "target" : 63,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 64,
+    "target" : 65,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 63,
+    "target" : 66,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 65,
+    "target" : 66,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 66,
+    "target" : 67,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 67,
+    "target" : 68,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-non-window-inner-join-with-null-cond/plan/join-non-window-inner-join-with-null-cond.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-non-window-inner-join-with-null-cond/plan/join-non-window-inner-join-with-null-cond.json
new file mode 100644
index 00000000000..f7c6c8b601e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-non-window-inner-join-with-null-cond/plan/join-non-window-inner-join-with-null-cond.json
@@ -0,0 +1,361 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 10,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`T1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, T1]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "91e5b69e-82c2-4d23-ac04-9211164073e1",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 11,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$IF$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 3,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "LITERAL",
+        "value" : null,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[IF((a = 3), null:INTEGER, a) AS a, b, c])"
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 13,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`T2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, T2]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "47712f45-0f47-42ab-8baa-0885733bc39c",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 14,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$IF$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 3,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "LITERAL",
+        "value" : null,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[IF((a = 3), null:INTEGER, a) AS a, b, c])"
+  }, {
+    "id" : 15,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 16,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ false ],
+      "nonEquiCondition" : {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$>$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "BIGINT"
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 4,
+          "type" : "BIGINT"
+        } ],
+        "type" : "BOOLEAN"
+      }
+    },
+    "estimatedLeftAvgRowSize" : 21,
+    "estimatedRightAvgRowSize" : 21,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 100000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : true,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `a0` 
INT, `b0` BIGINT, `c0` VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[InnerJoin], where=[(IS NOT DISTINCT 
FROM(a, a0) AND (b > b0))], select=[a, b, c, a0, b0, c0], build=[left])"
+  }, {
+    "id" : 17,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647), `c0` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a0 AS a, c0 AS c, c AS c0])"
+  }, {
+    "id" : 18,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "c1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c2",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647), `c0` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, c, c0])"
+  } ],
+  "edges" : [ {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-non-window-inner-join/plan/join-non-window-inner-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-non-window-inner-join/plan/join-non-window-inner-join.json
new file mode 100644
index 00000000000..88ff96dda72
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-non-window-inner-join/plan/join-non-window-inner-join.json
@@ -0,0 +1,361 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`T1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, T1]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "051539d2-0771-48e1-982f-0f229890919b",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$IF$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 3,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "LITERAL",
+        "value" : null,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[IF((a = 3), null:INTEGER, a) AS a, b, c])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`T2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, T2]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "1714d33b-3df1-49f3-8b58-507e8231f858",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$IF$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 3,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "LITERAL",
+        "value" : null,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[IF((a = 3), null:INTEGER, a) AS a, b, c])"
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 7,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$>$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "BIGINT"
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 4,
+          "type" : "BIGINT"
+        } ],
+        "type" : "BOOLEAN"
+      }
+    },
+    "estimatedLeftAvgRowSize" : 21,
+    "estimatedRightAvgRowSize" : 21,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 100000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : true,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `a0` 
INT, `b0` BIGINT, `c0` VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[InnerJoin], where=[((a = a0) AND (b > 
b0))], select=[a, b, c, a0, b0, c0], build=[left])"
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647), `c0` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a0 AS a, c0 AS c, c AS c0])"
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "c1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c2",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `c` VARCHAR(2147483647), `c0` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, c, c0])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-outer-join/plan/join-outer-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-outer-join/plan/join-outer-join.json
new file mode 100644
index 00000000000..0cf6d5d3484
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-outer-join/plan/join-outer-join.json
@@ -0,0 +1,249 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 55,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`EMPLOYEE`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "salary",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      } ]
+    },
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, EMPLOYEE, project=[deptno, name], metadata=[]]], 
fields=[deptno, name])",
+    "dynamicFilteringDataListenerID" : "bef07ee8-adaf-4773-a84a-5921f7919f5e",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 56,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[deptno]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 57,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`DEPARTMENT`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "department_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "b2",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b3",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 3 ] ],
+        "producedType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, DEPARTMENT, project=[department_num, department_name], 
metadata=[]]], fields=[department_num, department_name])",
+    "dynamicFilteringDataListenerID" : "53424f3f-742e-40ca-82f1-b628498a70ba",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 58,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[department_num]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 59,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "FULL",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "estimatedLeftAvgRowSize" : 16,
+    "estimatedRightAvgRowSize" : 16,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 100000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : true,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647), 
`department_num` INT, `department_name` VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[FullOuterJoin], where=[(deptno = 
department_num)], select=[deptno, name, department_num, department_name], 
build=[left])"
+  }, {
+    "id" : 60,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, department_name])"
+  }, {
+    "id" : 61,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[name, department_name])"
+  } ],
+  "edges" : [ {
+    "source" : 55,
+    "target" : 56,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 57,
+    "target" : 58,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 56,
+    "target" : 59,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 58,
+    "target" : 59,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 59,
+    "target" : 60,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 60,
+    "target" : 61,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-right-join/plan/join-right-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-right-join/plan/join-right-join.json
new file mode 100644
index 00000000000..cb273a0aa01
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-right-join/plan/join-right-join.json
@@ -0,0 +1,249 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 69,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`EMPLOYEE`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "salary",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      } ]
+    },
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, EMPLOYEE, project=[deptno, name], metadata=[]]], 
fields=[deptno, name])",
+    "dynamicFilteringDataListenerID" : "0c71ffa4-9057-4f8d-bef6-63c15c9a3ca2",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 70,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[deptno]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 71,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`DEPARTMENT`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "department_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "b2",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b3",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 3 ] ],
+        "producedType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, DEPARTMENT, project=[department_num, department_name], 
metadata=[]]], fields=[department_num, department_name])",
+    "dynamicFilteringDataListenerID" : "67c98cef-8b6c-4154-8c74-7fc0643548cf",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 72,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`department_num` INT, `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[department_num]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 73,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "RIGHT",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "estimatedLeftAvgRowSize" : 16,
+    "estimatedRightAvgRowSize" : 16,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 100000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : true,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647), 
`department_num` INT, `department_name` VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[RightOuterJoin], where=[(deptno = 
department_num)], select=[deptno, name, department_num, department_name], 
build=[left])"
+  }, {
+    "id" : 74,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, department_name])"
+  }, {
+    "id" : 75,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[name, department_name])"
+  } ],
+  "edges" : [ {
+    "source" : 69,
+    "target" : 70,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 71,
+    "target" : 72,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 70,
+    "target" : 73,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 72,
+    "target" : 73,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 73,
+    "target" : 74,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 74,
+    "target" : 75,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-semi-join/plan/join-semi-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-semi-join/plan/join-semi-join.json
new file mode 100644
index 00000000000..0cc3ddf2590
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-semi-join/plan/join-semi-join.json
@@ -0,0 +1,242 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 76,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`EMPLOYEE`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "salary",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      } ]
+    },
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, EMPLOYEE, project=[deptno, name], metadata=[]]], 
fields=[deptno, name])",
+    "dynamicFilteringDataListenerID" : "96b374ee-2e53-46f3-8ec5-fc92f2184085",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 77,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[deptno]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 78,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`DEPARTMENT`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "department_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "b2",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b3",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ] ],
+        "producedType" : "ROW<`department_num` INT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`department_num` INT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`department_num` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, DEPARTMENT, project=[department_num], metadata=[]]], 
fields=[department_num])",
+    "dynamicFilteringDataListenerID" : "067e2452-9694-4dc5-828c-328592fda40b",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 79,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`department_num` INT>",
+    "description" : "Exchange(distribution=[hash[department_num]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 80,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "SEMI",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "estimatedLeftAvgRowSize" : 16,
+    "estimatedRightAvgRowSize" : 4,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 100000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : false,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`deptno` INT, `name` VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[LeftSemiJoin], where=[(deptno = 
department_num)], select=[deptno, name], build=[right])"
+  }, {
+    "id" : 81,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name])"
+  }, {
+    "id" : 82,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[name])"
+  } ],
+  "edges" : [ {
+    "source" : 76,
+    "target" : 77,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 78,
+    "target" : 79,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 77,
+    "target" : 80,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 79,
+    "target" : 80,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 80,
+    "target" : 81,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 81,
+    "target" : 82,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-with-filter/plan/join-with-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-with-filter/plan/join-with-filter.json
new file mode 100644
index 00000000000..e0ed9cbafb6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-join_1/join-with-filter/plan/join-with-filter.json
@@ -0,0 +1,312 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 19,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`EMPLOYEE`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "deptno",
+              "dataType" : "INT"
+            }, {
+              "name" : "salary",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$<$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 1,
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 2,
+            "type" : "BIGINT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ]
+      }, {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 1 ], [ 2 ] ],
+        "producedType" : "ROW<`salary` BIGINT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`salary` BIGINT, `name` VARCHAR(2147483647)> NOT 
NULL"
+      } ]
+    },
+    "outputType" : "ROW<`salary` BIGINT, `name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, EMPLOYEE, filter=[<(salary, 2:BIGINT)], project=[salary, 
name], metadata=[]]], fields=[salary, name])",
+    "dynamicFilteringDataListenerID" : "008301c9-299b-4732-8232-47358a1ff567",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 20,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`salary` BIGINT, `name` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[salary]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 21,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`DEPARTMENT`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "department_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "b2",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b3",
+              "dataType" : "INT"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      }, {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 1 ], [ 3 ] ],
+        "producedType" : "ROW<`b2` BIGINT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`b2` BIGINT, `department_name` 
VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`b2` BIGINT, `department_name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, DEPARTMENT, filter=[], project=[b2, department_name], 
metadata=[]]], fields=[b2, department_name])",
+    "dynamicFilteringDataListenerID" : "3a85f228-a6f4-4840-8737-c6dced82a5db",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 22,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$<$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 2,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b2` BIGINT, `department_name` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[b2, department_name], where=[(b2 < 2)])"
+  }, {
+    "id" : 23,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b2` BIGINT, `department_name` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[b2]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 24,
+    "type" : "batch-exec-join_1",
+    "configuration" : {
+      "table.exec.resource.external-buffer-memory" : "10 mb",
+      "table.exec.resource.hash-join.memory" : "128 mb",
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 0 ],
+      "rightKeys" : [ 0 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "estimatedLeftAvgRowSize" : 20,
+    "estimatedRightAvgRowSize" : 20,
+    "estimatedLeftRowCount" : 100000000,
+    "estimatedRightRowCount" : 50000000,
+    "isBroadcast" : false,
+    "leftIsBuild" : false,
+    "tryDistinctBuildRow" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 1
+    }, {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`salary` BIGINT, `name` VARCHAR(2147483647), `b2` 
BIGINT, `department_name` VARCHAR(2147483647)>",
+    "description" : "HashJoin(joinType=[InnerJoin], where=[(salary = b2)], 
select=[salary, name, b2, department_name], build=[right])"
+  }, {
+    "id" : 25,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, department_name])"
+  }, {
+    "id" : 26,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "department_name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `department_name` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[name, department_name])"
+  } ],
+  "edges" : [ {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 24,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 23,
+    "target" : 24,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 24,
+    "target" : 25,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 25,
+    "target" : 26,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to