This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fabff4f38a56b29400b644d370e99cb8a398e8bb
Author: yuzhao.cyz <[email protected]>
AuthorDate: Tue Mar 17 21:35:48 2020 +0800

    [FLINK-14338][table-planner][table-planner-blink] Update all kinds of left 
plan changes
    
    * Some join order changes for blink-planner due to the rule fire sequence 
changes, see 
https://github.com/apache/calcite/commit/35caa059a762094c7df0b30e9b51358a19b48ac2,
 they are still correct
    * The Correlate row count estimation has been fixed from a always 1 to join 
like estimation, thus, if the inputs of Join is a Correlate, the join algorithm 
would very probably changes, i.e. batch.sql.SubplanReuseTest
    * Due to CALCITE-3729, the filter condition was pushed down for some Join 
cases: batch.sql.join.JoinReorderTest
    * Due to CALCITE-2450 RexNode normalization, the predicates sequence of 
some test changes: logical.subquery.FlinkRewriteSubQueryRuleTest
    * The Decimal modulus precision inference has been fixed: 
planner.expressions.DecimalTypeTest
---
 .../src/main/codegen/data/Parser.tdd               |   1 +
 flink-table/flink-table-planner-blink/pom.xml      |   5 +
 .../src/main/resources/META-INF/NOTICE             |   3 +
 .../table/planner/calcite/FlinkRelFactories.scala  |  16 ++-
 .../WindowAggregateReduceFunctionsRule.scala       |  18 +--
 .../batch/BatchExecWindowAggregateRule.scala       |  20 +---
 .../planner/plan/batch/sql/SubplanReuseTest.xml    |  41 +++----
 .../plan/batch/sql/agg/HashAggregateTest.xml       |   6 +-
 .../plan/batch/sql/agg/SortAggregateTest.xml       |   6 +-
 .../sql/join/BroadcastHashSemiAntiJoinTest.xml     |  40 +++----
 .../plan/batch/sql/join/JoinReorderTest.xml        |  85 +++++++-------
 .../batch/sql/join/NestedLoopSemiAntiJoinTest.xml  |  82 +++++++------
 .../plan/batch/sql/join/SemiAntiJoinTest.xml       | 126 ++++++++++----------
 .../sql/join/ShuffledHashSemiAntiJoinTest.xml      | 130 +++++++++++++++++----
 .../batch/sql/join/SortMergeSemiAntiJoinTest.xml   |  45 ++++---
 .../JoinDependentConditionDerivationRuleTest.xml   |  14 +--
 .../subquery/FlinkRewriteSubQueryRuleTest.xml      |   2 +-
 .../logical/subquery/SubQueryAntiJoinTest.xml      |  24 ++--
 .../planner/plan/stream/sql/agg/AggregateTest.xml  |   2 +-
 .../plan/stream/sql/agg/TwoStageAggregateTest.xml  |   2 +-
 .../plan/stream/sql/join/SemiAntiJoinTest.xml      |  86 +++++++-------
 .../planner/plan/stream/table/AggregateTest.xml    |   4 +-
 .../plan/stream/table/TwoStageAggregateTest.xml    |   4 +-
 .../planner/expressions/DecimalTypeTest.scala      |   6 +-
 .../sql/join/ShuffledHashSemiAntiJoinTest.scala    |  21 ----
 .../planner/plan/utils/FlinkRexUtilTest.scala      |   6 +-
 .../catalog/QueryOperationCatalogViewTable.java    |  30 ++++-
 .../table/operations/PlannerQueryOperation.java    |  20 +++-
 .../table/api/batch/sql/GroupWindowTest.scala      |   6 +-
 .../table/api/batch/sql/SetOperatorsTest.scala     |  12 +-
 .../flink/table/api/batch/table/CalcTest.scala     |   2 +-
 .../table/api/stream/sql/GroupWindowTest.scala     |  10 +-
 .../flink/table/api/stream/sql/JoinTest.scala      |   2 +-
 .../api/stream/sql/TemporalTableJoinTest.scala     |  17 +--
 .../api/stream/table/TemporalTableJoinTest.scala   |  17 +--
 .../flink/table/plan/RexProgramExtractorTest.scala |   4 +-
 .../resources/testSqlUpdateAndToDataStream.out     |   2 +-
 37 files changed, 496 insertions(+), 421 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 14158c7..f11f5d9 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -68,6 +68,7 @@
 
   # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is 
not a reserved
   # keyword, please also add it to 'nonReservedKeywords' section.
+  # Please keep the keyword in alphabetical order if new keyword is added.
   keywords: [
     "BYTES"
     "CATALOGS"
diff --git a/flink-table/flink-table-planner-blink/pom.xml 
b/flink-table/flink-table-planner-blink/pom.xml
index b61f7d1..0bb7aff 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -320,6 +320,7 @@ under the License.
                                                                        
<include>com.fasterxml.jackson.core:jackson-databind</include>
                                                                        
<include>com.fasterxml.jackson.core:jackson-annotations</include>
                                                                        
<include>commons-codec:commons-codec</include>
+                                                                       
<include>commons-io:commons-io</include>
 
                                                                        <!-- 
flink-table-planner-blink dependencies -->
                                                                        
<include>org.apache.flink:flink-sql-parser</include>
@@ -352,6 +353,10 @@ under the License.
                                                                        
<pattern>org.apache.commons.codec</pattern>
                                                                        
<shadedPattern>org.apache.flink.calcite.shaded.org.apache.commons.codec</shadedPattern>
                                                                </relocation>
+                                                               <relocation>
+                                                                       
<pattern>org.apache.commons.io</pattern>
+                                                                       
<shadedPattern>org.apache.flink.calcite.shaded.org.apache.commons.io</shadedPattern>
+                                                               </relocation>
 
                                                                <!-- 
flink-table-planner dependencies -->
                                                                <!-- not 
relocated for now, because we need to change the contents of the properties 
field otherwise -->
diff --git 
a/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE 
b/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE
index 381d6d5..e1246a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-planner-blink/src/main/resources/META-INF/NOTICE
@@ -12,6 +12,9 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.fasterxml.jackson.core:jackson-databind:2.10.1
 - com.jayway.jsonpath:json-path:2.4.0
 - joda-time:joda-time:2.5
+- net.minidev:json-smart:jar:2.3
+- net.minidev:accessors-smart:jar:1.2
+- org.ow2.asm:asm:jar:5.0.4
 - org.apache.calcite:calcite-core:1.22.0
 - org.apache.calcite:calcite-linq4j:1.22.0
 - org.apache.calcite.avatica:avatica-core:1.16.0
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala
index 3af65df..1f002b2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelFactories.scala
@@ -24,9 +24,10 @@ import org.apache.flink.table.sinks.TableSink
 
 import org.apache.calcite.plan.Contexts
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.core.RelFactories
 import org.apache.calcite.rel.{RelCollation, RelNode}
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilderFactory
+import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
 import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
@@ -38,6 +39,19 @@ object FlinkRelFactories {
 
   val FLINK_REL_BUILDER: RelBuilderFactory = 
FlinkRelBuilder.proto(Contexts.empty)
 
+  // Because of:
+  // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the 
input,
+  // if the input is a Project.
+  //
+  // the field can not be pruned if it is referenced by other expressions
+  // of the window aggregation(i.e. the TUMBLE_START/END).
+  // To solve this, we config the RelBuilder to forbidden this feature.
+  val LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE: RelBuilderFactory = 
RelBuilder.proto(
+    Contexts.of(
+      RelFactories.DEFAULT_STRUCT,
+      RelBuilder.Config.DEFAULT
+        .withPruneInputOfAggregate(false)))
+
   val DEFAULT_EXPAND_FACTORY = new ExpandFactoryImpl
 
   val DEFAULT_RANK_FACTORY = new RankFactoryImpl
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala
index f6d7f9e..0af6c63 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowAggregateReduceFunctionsRule.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.planner.plan.rules.logical
 
+import org.apache.flink.table.planner.calcite.FlinkRelFactories
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate
 
-import org.apache.calcite.plan.Contexts
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rel.logical.LogicalAggregate
 import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule
 import org.apache.calcite.rex.RexNode
@@ -40,11 +40,7 @@ import scala.collection.JavaConversions._
 class WindowAggregateReduceFunctionsRule
   extends AggregateReduceFunctionsRule(
     operand(classOf[LogicalWindowAggregate], any()),
-    RelBuilder.proto(
-      Contexts.of(
-        RelFactories.DEFAULT_STRUCT,
-        RelBuilder.Config.DEFAULT
-          .withPruneInputOfAggregate(false)))) {
+    FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE) {
 
   override def newAggregateRel(
       relBuilder: RelBuilder,
@@ -52,14 +48,6 @@ class WindowAggregateReduceFunctionsRule
       newCalls: util.List[AggregateCall]): Unit = {
 
     // create a LogicalAggregate with simpler aggregation functions
-
-    // Because of:
-    // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the 
input,
-    // if the input is a Project.
-    //
-    // the field can not be pruned if it is referenced by other expressions
-    // of the window aggregation(i.e. the TUMBLE_START/END).
-    // To solve this, we config the RelBuilder to forbidden this feature.
     super.newAggregateRel(relBuilder, oldAgg, newCalls)
     // pop LogicalAggregate from RelBuilder
     val newAgg = relBuilder.build().asInstanceOf[LogicalAggregate]
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
index bf430cb..847f29b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecWindowAggregateRule.scala
@@ -21,7 +21,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.batch
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction}
-import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory}
+import org.apache.flink.table.planner.calcite.{FlinkContext, 
FlinkRelFactories, FlinkTypeFactory}
 import 
org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.logical.{LogicalWindow, 
SlidingGroupWindow, TumblingGroupWindow}
@@ -34,13 +34,12 @@ import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat
 import org.apache.flink.table.types.logical.{BigIntType, IntType, LogicalType}
 
 import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.{Contexts, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Aggregate.Group
-import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.tools.RelBuilder
 import org.apache.commons.math3.util.ArithmeticUtils
 
 import scala.collection.JavaConversions._
@@ -71,11 +70,7 @@ class BatchExecWindowAggregateRule
   extends RelOptRule(
     operand(classOf[FlinkLogicalWindowAggregate],
       operand(classOf[RelNode], any)),
-    RelBuilder.proto(
-      Contexts.of(
-        RelFactories.DEFAULT_STRUCT,
-        RelBuilder.Config.DEFAULT
-          .withPruneInputOfAggregate(false))),
+    FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE,
     "BatchExecWindowAggregateRule")
   with BatchExecAggRuleBase {
 
@@ -163,13 +158,6 @@ class BatchExecWindowAggregateRule
     // TODO aggregate include projection now, so do not provide new trait will 
be safe
     val aggProvidedTraitSet = 
input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
 
-    // Because of:
-    // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the 
input,
-    // if the input is a Project.
-    //
-    // the field can not be pruned if it is referenced by other expressions
-    // of the window aggregation(i.e. the TUMBLE_START/END).
-    // To solve this, we config the RelBuilder to forbidden this feature.
     val inputTimeFieldIndex = AggregateUtil.timeFieldIndex(
       input.getRowType, call.builder(), window.timeAttribute)
     val inputTimeFieldType = 
agg.getInput.getRowType.getFieldList.get(inputTimeFieldIndex).getType
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
index d36ed2b..157aadbb 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
@@ -426,10 +426,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], v=[$3], a0=[$4], 
b0=[$5], c0=[$6], v0=[$7
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashJoin(joinType=[InnerJoin], where=[=(f0, f00)], select=[a, b, c, f0, a0, 
b0, c0, f00], isBroadcast=[true], build=[right])
-:- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor0.c,_UTF-16LE'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
-:  +- TableSourceScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
+HashJoin(joinType=[InnerJoin], where=[=(f0, f00)], select=[a, b, c, f0, a0, 
b0, c0, f00], build=[right])
+:- Exchange(distribution=[hash[f0]])
+:  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor0.c,_UTF-16LE'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+:     +- TableSourceScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[f0]])
    +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor1.c,_UTF-16LE'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
       +- TableSourceScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -459,12 +460,13 @@ LogicalProject(a=[$0], b=[$1], c=[$2], s=[$3], a0=[$4], 
b0=[$5], c0=[$6], s0=[$7
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[=(c, f00)], select=[a, b, c, f0, 
a0, b0, c0, f00], build=[left])
-:- Exchange(distribution=[broadcast])
-:  +- Correlate(invocation=[TableFun($cor0.c)], 
correlate=[table(TableFun($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+HashJoin(joinType=[InnerJoin], where=[=(c, f00)], select=[a, b, c, f0, a0, b0, 
c0, f00], build=[right])
+:- Correlate(invocation=[TableFun($cor0.c)], 
correlate=[table(TableFun($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+:  +- Exchange(distribution=[hash[c]])
 :     +- TableSourceScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Correlate(invocation=[TableFun($cor1.c)], 
correlate=[table(TableFun($cor1.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
-   +- TableSourceScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[f0]])
+   +- Correlate(invocation=[TableFun($cor1.c)], 
correlate=[table(TableFun($cor1.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+      +- TableSourceScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1093,17 +1095,16 @@ LogicalIntersect(all=[false])
 NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, 
random0)], select=[random], build=[right])
 :- SortAggregate(isMerge=[false], groupBy=[random], select=[random])
 :  +- Sort(orderBy=[random ASC])
-:     +- Exchange(distribution=[hash[random]])
-:        +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT 
FROM(random, random0)], select=[random], build=[right])
-:           :- Exchange(distribution=[any], shuffle_mode=[BATCH])
-:           :  +- Calc(select=[random], reuse_id=[1])
-:           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], 
global=[true])
-:           :        +- Exchange(distribution=[single])
-:           :           +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], 
fetch=[1], global=[false])
-:           :              +- Calc(select=[a AS random, RAND() AS EXPR$1])
-:           :                 +- TableSourceScan(table=[[default_catalog, 
default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:           +- Exchange(distribution=[broadcast], reuse_id=[2])
-:              +- Reused(reference_id=[1])
+:     +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT 
FROM(random, random0)], select=[random], build=[right])
+:        :- Exchange(distribution=[hash[random]], shuffle_mode=[BATCH])
+:        :  +- Calc(select=[random], reuse_id=[1])
+:        :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], 
global=[true])
+:        :        +- Exchange(distribution=[single])
+:        :           +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], 
global=[false])
+:        :              +- Calc(select=[a AS random, RAND() AS EXPR$1])
+:        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:        +- Exchange(distribution=[broadcast], reuse_id=[2])
+:           +- Reused(reference_id=[1])
 +- Reused(reference_id=[2])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index 062a047..73f0151 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -717,7 +717,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
 +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS 
EXPR$1])
    +- Exchange(distribution=[hash[a]])
       +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
-         +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+         +- Calc(select=[a, b])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -741,7 +741,7 @@ LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
 Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
 +- HashAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
    +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+      +- Calc(select=[a, b])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -766,7 +766,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
 +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MAX(max$0) AS 
EXPR$1])
    +- Exchange(distribution=[hash[a]])
       +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS max$0])
-         +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+         +- Calc(select=[a, b])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index b9b901d..3f3e03f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -737,7 +737,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
       +- Exchange(distribution=[hash[a]])
          +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS 
max$0])
             +- Sort(orderBy=[a ASC])
-               +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+               +- Calc(select=[a, b])
                   +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
@@ -762,7 +762,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
 +- SortAggregate(isMerge=[false], groupBy=[a], select=[a, MAX(b) AS EXPR$1])
    +- Sort(orderBy=[a ASC])
       +- Exchange(distribution=[hash[a]])
-         +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+         +- Calc(select=[a, b])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -789,7 +789,7 @@ Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
       +- Exchange(distribution=[hash[a]])
          +- LocalSortAggregate(groupBy=[a], select=[a, Partial_MAX(b) AS 
max$0])
             +- Sort(orderBy=[a ASC])
-               +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+               +- Calc(select=[a, b])
                   +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
index cd2c9eb..cbd5d3e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
@@ -1399,19 +1399,19 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 
100))])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], 
isBroadcast=[true], build=[left])
-:- Exchange(distribution=[broadcast])
-:  +- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], 
isBroadcast=[true], build=[right])
-:     :- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], 
isBroadcast=[true], build=[right])
-:     :  :- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
-:     :  :  +- TableSourceScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:     :  +- Exchange(distribution=[broadcast])
-:     :     +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
-:     :        +- TableSourceScan(table=[[default_catalog, default_database, 
t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
-:     +- Exchange(distribution=[broadcast])
-:        +- Calc(select=[i], where=[<(j, 100)])
-:           +- Reused(reference_id=[1])
-+- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], 
isBroadcast=[true], build=[right])
+:- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, 
f], isBroadcast=[true], build=[right])
+:  :- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, 
f], isBroadcast=[true], build=[left])
+:  :  :- Exchange(distribution=[broadcast])
+:  :  :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
+:  :  :     +- TableSourceScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:  :  +- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+:  +- Exchange(distribution=[broadcast])
+:     +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
+:        +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[i], where=[<(j, 100)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -1854,28 +1854,26 @@ Calc(select=[b])
    :     :  :     :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :     :  :     :        +- Exchange(distribution=[single])
    :     :  :     :           +- LocalSortAggregate(select=[Partial_COUNT(*) 
AS count1$0, Partial_COUNT(i) AS count$1])
-   :     :  :     :              +- Calc(select=[i])
-   :     :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+   :     :  :     :              +- Calc(select=[i], reuse_id=[1])
+   :     :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2])
    :     :  :     +- Exchange(distribution=[broadcast])
    :     :  :        +- Calc(select=[i, true AS i0])
    :     :  :           +- HashAggregate(isMerge=[true], groupBy=[i], 
select=[i])
    :     :  :              +- Exchange(distribution=[hash[i]])
    :     :  :                 +- LocalHashAggregate(groupBy=[i], select=[i])
-   :     :  :                    +- Calc(select=[i, true AS i0])
-   :     :  :                       +- Reused(reference_id=[1])
+   :     :  :                    +- Reused(reference_id=[1])
    :     :  +- Exchange(distribution=[broadcast])
    :     :     +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) 
AS c, Final_COUNT(count$1) AS ck])
    :     :        +- Exchange(distribution=[single])
    :     :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0, Partial_COUNT(EXPR$0) AS count$1])
-   :     :              +- Calc(select=[CAST(j) AS EXPR$0])
-   :     :                 +- Reused(reference_id=[1])
+   :     :              +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3])
+   :     :                 +- Reused(reference_id=[2])
    :     +- Exchange(distribution=[broadcast])
    :        +- Calc(select=[EXPR$0, true AS i])
    :           +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :              +- Exchange(distribution=[hash[EXPR$0]])
    :                 +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
-   :                    +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[3])
    +- Exchange(distribution=[broadcast])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
index 8fe210d..b4fc401 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
@@ -174,17 +174,16 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], 
b2=[$4], c2=[$5], a3=[$6], b3
       <![CDATA[
 Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
 +- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, 
b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
-   :- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, 
c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+   :- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, 
a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
    :  :- TableSourceScan(table=[[default_catalog, default_database, T2, 
source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
    :  +- Exchange(distribution=[broadcast])
-   :     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], 
select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right])
+   :     +- HashJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, 
c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
    :        :- TableSourceScan(table=[[default_catalog, default_database, T5, 
source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
    :        +- Exchange(distribution=[broadcast])
-   :           +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)])
-   :              +- HashJoin(joinType=[InnerJoin], where=[=(a3, a1)], 
select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
-   :                 :- TableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, 
c1])
-   :                 +- Exchange(distribution=[broadcast])
-   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, 
c3])
+   :           +- HashJoin(joinType=[InnerJoin], where=[=(a3, a1)], 
select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])
+   :              :- TableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, 
c1])
+   :              +- Exchange(distribution=[broadcast])
+   :                 +- TableSourceScan(table=[[default_catalog, 
default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, 
c3])
    +- Exchange(distribution=[broadcast])
       +- TableSourceScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
 ]]>
@@ -216,16 +215,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], 
b2=[$4], c2=[$5], a3=[$6], b3
       <![CDATA[
 Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
 +- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, 
b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
-   :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, 
a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right])
+   :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, 
a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[left])
    :  :- Exchange(distribution=[hash[b1]])
    :  :  +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
    :  +- Exchange(distribution=[hash[b4]])
    :     +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, 
c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])
-   :        :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)])
-   :        :  +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], 
select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
-   :        :     :- TableSourceScan(table=[[default_catalog, 
default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, 
c5])
-   :        :     +- Exchange(distribution=[broadcast])
-   :        :        +- TableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, 
c2])
+   :        :- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, 
b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
+   :        :  :- TableSourceScan(table=[[default_catalog, default_database, 
T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        :  +- Exchange(distribution=[broadcast])
+   :        :     +- TableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, 
c2])
    :        +- Exchange(distribution=[broadcast])
    :           +- TableSourceScan(table=[[default_catalog, default_database, 
T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
    +- Exchange(distribution=[broadcast])
@@ -270,11 +268,10 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, 
c4, a5, b5, c5])
    :  +- Exchange(distribution=[single])
    :     +- TableSourceScan(table=[[default_catalog, default_database, T3, 
source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
    +- Exchange(distribution=[broadcast])
-      +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)])
-         +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, 
c5, a4, b4, c4], isBroadcast=[true], build=[right])
-            :- TableSourceScan(table=[[default_catalog, default_database, T5, 
source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
-            +- Exchange(distribution=[broadcast])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+      +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, 
a4, b4, c4], isBroadcast=[true], build=[right])
+         :- TableSourceScan(table=[[default_catalog, default_database, T5, 
source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+         +- Exchange(distribution=[broadcast])
+            +- TableSourceScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
 ]]>
     </Resource>
   </TestCase>
@@ -348,7 +345,7 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], 
c2=[$5], a3=[$6], b3
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
-+- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, 
a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], build=[right])
++- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, 
b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
    :- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, 
a2, b2, c2, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
    :  :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, 
b1, c1, a2, b2, c2], build=[right])
    :  :  :- Exchange(distribution=[hash[a1]])
@@ -356,11 +353,10 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, 
c4, a5, b5, c5])
    :  :  +- Exchange(distribution=[hash[a2]])
    :  :     +- TableSourceScan(table=[[default_catalog, default_database, T2, 
source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
    :  +- Exchange(distribution=[broadcast])
-   :     +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)])
-   :        +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, 
b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
-   :           :- TableSourceScan(table=[[default_catalog, default_database, 
T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
-   :           +- Exchange(distribution=[broadcast])
-   :              +- TableSourceScan(table=[[default_catalog, 
default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, 
c4])
+   :     +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, 
c5, a4, b4, c4], isBroadcast=[true], build=[right])
+   :        :- TableSourceScan(table=[[default_catalog, default_database, T5, 
source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        +- Exchange(distribution=[broadcast])
+   :           +- TableSourceScan(table=[[default_catalog, default_database, 
T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
    +- Exchange(distribution=[broadcast])
       +- TableSourceScan(table=[[default_catalog, default_database, T3, 
source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
 ]]>
@@ -408,11 +404,10 @@ Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, 
c3, a5, b5, c5])
    :        :- Calc(select=[a5, b5, c5], where=[<(b5, 15)])
    :        :  +- TableSourceScan(table=[[default_catalog, default_database, 
T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
    :        +- Exchange(distribution=[broadcast])
-   :           +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)])
-   :              +- HashJoin(joinType=[InnerJoin], where=[AND(=(a3, a1), 
<(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], 
build=[right])
-   :                 :- TableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, 
c1])
-   :                 +- Exchange(distribution=[broadcast])
-   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, 
c3])
+   :           +- HashJoin(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, 
b3), 2000))], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], 
build=[right])
+   :              :- TableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, 
c1])
+   :              +- Exchange(distribution=[broadcast])
+   :                 +- TableSourceScan(table=[[default_catalog, 
default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, 
c3])
    +- Exchange(distribution=[broadcast])
       +- TableSourceScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
 ]]>
@@ -538,16 +533,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], 
b2=[$4], c2=[$5], a3=[$6], b3
       <![CDATA[
 Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
 +- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, 
b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
-   :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, 
a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right])
+   :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, 
a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[left])
    :  :- Exchange(distribution=[hash[b1]])
    :  :  +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
    :  +- Exchange(distribution=[hash[b4]])
    :     +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, 
c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])
-   :        :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)])
-   :        :  +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], 
select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
-   :        :     :- TableSourceScan(table=[[default_catalog, 
default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, 
c5])
-   :        :     +- Exchange(distribution=[broadcast])
-   :        :        +- TableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, 
c2])
+   :        :- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, 
b5, c5, a2, b2, c2], isBroadcast=[true], build=[right])
+   :        :  :- TableSourceScan(table=[[default_catalog, default_database, 
T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+   :        :  +- Exchange(distribution=[broadcast])
+   :        :     +- TableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, 
c2])
    :        +- Exchange(distribution=[broadcast])
    :           +- TableSourceScan(table=[[default_catalog, default_database, 
T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
    +- Exchange(distribution=[broadcast])
@@ -583,17 +577,16 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, 
c4, a5, b5, c5])
 +- HashJoin(joinType=[InnerJoin], where=[=(c1, c2)], select=[a1, b1, c1, a2, 
b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
    :- TableSourceScan(table=[[default_catalog, default_database, T1, source: 
[TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
    +- Exchange(distribution=[broadcast])
-      +- HashJoin(joinType=[InnerJoin], where=[=(c2, c5)], select=[a2, b2, c2, 
a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
-         :- TableSourceScan(table=[[default_catalog, default_database, T2, 
source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
-         +- Exchange(distribution=[broadcast])
-            +- HashJoin(joinType=[InnerJoin], where=[=(c3, c4)], select=[a5, 
b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
-               :- Calc(select=[a5, b5, c5, a3, b3, c3], where=[=(c5, c3)])
-               :  +- HashJoin(joinType=[InnerJoin], where=[=(c5, c3)], 
select=[a5, b5, c5, a3, b3, c3], isBroadcast=[true], build=[right])
-               :     :- TableSourceScan(table=[[default_catalog, 
default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, 
c5])
-               :     +- Exchange(distribution=[broadcast])
-               :        +- TableSourceScan(table=[[default_catalog, 
default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, 
c3])
-               +- Exchange(distribution=[broadcast])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, 
c4])
+      +- HashJoin(joinType=[InnerJoin], where=[=(c2, c5)], select=[a2, b2, c2, 
a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[left])
+         :- Exchange(distribution=[broadcast])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T2, 
source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+         +- HashJoin(joinType=[InnerJoin], where=[=(c3, c4)], select=[a5, b5, 
c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+            :- HashJoin(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, 
b5, c5, a3, b3, c3], isBroadcast=[true], build=[right])
+            :  :- TableSourceScan(table=[[default_catalog, default_database, 
T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+            :  +- Exchange(distribution=[broadcast])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, 
c3])
+            +- Exchange(distribution=[broadcast])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
index fa78f5d..02364c1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
@@ -508,27 +508,27 @@ LogicalFilter(condition=[<>($cor0.b, $1)])
     <Resource name="planAfter">
       <![CDATA[
 NestedLoopJoin(joinType=[LeftAntiJoin], where=[<>(b, e)], select=[a, b, c], 
build=[right])
-:- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), 
IS NULL(i)), =(c, k))], select=[a, b, c], build=[right])
-:  :- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], 
build=[right], singleRowJoin=[true])
+:- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], 
build=[right], singleRowJoin=[true])
+:  :- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS 
NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], build=[right])
 :  :  :- NestedLoopJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, 
b, c], build=[right])
 :  :  :  :- TableSourceScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 :  :  :  +- Exchange(distribution=[broadcast])
 :  :  :     +- Calc(select=[d])
 :  :  :        +- TableSourceScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], reuse_id=[1])
 :  :  +- Exchange(distribution=[broadcast])
-:  :     +- Calc(select=[IS NOT NULL(m) AS $f0])
-:  :        +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m])
-:  :           +- Exchange(distribution=[single])
-:  :              +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0])
-:  :                 +- Calc(select=[true AS i])
-:  :                    +- HashAggregate(isMerge=[true], groupBy=[l], 
select=[l])
-:  :                       +- Exchange(distribution=[hash[l]])
-:  :                          +- LocalHashAggregate(groupBy=[l], select=[l])
-:  :                             +- Calc(select=[l], where=[LIKE(n, 
_UTF-16LE'Test')])
-:  :                                +- 
TableSourceScan(table=[[default_catalog, default_database, t2, source: 
[TestTableSource(l, m, n)]]], fields=[l, m, n])
+:  :     +- Calc(select=[i, k], where=[>(i, 10)])
+:  :        +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
 :  +- Exchange(distribution=[broadcast])
-:     +- Calc(select=[i, k], where=[>(i, 10)])
-:        +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+:     +- Calc(select=[IS NOT NULL(m) AS $f0])
+:        +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m])
+:           +- Exchange(distribution=[single])
+:              +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0])
+:                 +- Calc(select=[true AS i])
+:                    +- HashAggregate(isMerge=[true], groupBy=[l], select=[l])
+:                       +- Exchange(distribution=[hash[l]])
+:                          +- LocalHashAggregate(groupBy=[l], select=[l])
+:                             +- Calc(select=[l], where=[LIKE(n, 
_UTF-16LE'Test')])
+:                                +- TableSourceScan(table=[[default_catalog, 
default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n])
 +- Exchange(distribution=[broadcast])
    +- Calc(select=[e])
       +- Reused(reference_id=[1])
@@ -679,15 +679,14 @@ Calc(select=[b])
    :     :     +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) 
AS c])
    :     :        +- Exchange(distribution=[single])
    :     :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0])
-   :     :              +- Calc(select=[1 AS EXPR$0])
-   :     :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], 
reuse_id=[1])
+   :     :              +- Calc(select=[1 AS EXPR$0], reuse_id=[1])
+   :     :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
    :     +- Exchange(distribution=[broadcast])
    :        +- Calc(select=[true AS i])
    :           +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :              +- Exchange(distribution=[hash[EXPR$0]])
    :                 +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
-   :                    +- Calc(select=[1 AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[1])
    +- Exchange(distribution=[broadcast])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -1204,15 +1203,14 @@ Calc(select=[b])
    :     :     +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) 
AS c])
    :     :        +- Exchange(distribution=[single])
    :     :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0])
-   :     :              +- Calc(select=[1 AS EXPR$0])
-   :     :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], 
reuse_id=[1])
+   :     :              +- Calc(select=[1 AS EXPR$0], reuse_id=[1])
+   :     :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
    :     +- Exchange(distribution=[broadcast])
    :        +- Calc(select=[true AS i])
    :           +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :              +- Exchange(distribution=[hash[EXPR$0]])
    :                 +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
-   :                    +- Calc(select=[1 AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[1])
    +- Exchange(distribution=[broadcast])
       +- Calc(select=[d])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -1872,19 +1870,19 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 
100))])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, 
f], build=[left])
-:- Exchange(distribution=[broadcast])
-:  +- NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, 
c], build=[right])
-:     :- NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, 
b, c], build=[right])
-:     :  :- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
-:     :  :  +- TableSourceScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:     :  +- Exchange(distribution=[broadcast])
-:     :     +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
-:     :        +- TableSourceScan(table=[[default_catalog, default_database, 
t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
-:     +- Exchange(distribution=[broadcast])
-:        +- Calc(select=[i], where=[<(j, 100)])
-:           +- Reused(reference_id=[1])
-+- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, 
e, f], build=[right])
+:- NestedLoopJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, 
d, e, f], build=[right])
+:  :- NestedLoopJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, 
d, e, f], build=[left])
+:  :  :- Exchange(distribution=[broadcast])
+:  :  :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
+:  :  :     +- TableSourceScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:  :  +- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+:  +- Exchange(distribution=[broadcast])
+:     +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
+:        +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[i], where=[<(j, 100)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -2250,28 +2248,26 @@ Calc(select=[b])
    :     :  :     :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :     :  :     :        +- Exchange(distribution=[single])
    :     :  :     :           +- LocalSortAggregate(select=[Partial_COUNT(*) 
AS count1$0, Partial_COUNT(i) AS count$1])
-   :     :  :     :              +- Calc(select=[i])
-   :     :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+   :     :  :     :              +- Calc(select=[i], reuse_id=[1])
+   :     :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2])
    :     :  :     +- Exchange(distribution=[broadcast])
    :     :  :        +- Calc(select=[i, true AS i0])
    :     :  :           +- HashAggregate(isMerge=[true], groupBy=[i], 
select=[i])
    :     :  :              +- Exchange(distribution=[hash[i]])
    :     :  :                 +- LocalHashAggregate(groupBy=[i], select=[i])
-   :     :  :                    +- Calc(select=[i, true AS i0])
-   :     :  :                       +- Reused(reference_id=[1])
+   :     :  :                    +- Reused(reference_id=[1])
    :     :  +- Exchange(distribution=[broadcast])
    :     :     +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) 
AS c, Final_COUNT(count$1) AS ck])
    :     :        +- Exchange(distribution=[single])
    :     :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0, Partial_COUNT(EXPR$0) AS count$1])
-   :     :              +- Calc(select=[CAST(j) AS EXPR$0])
-   :     :                 +- Reused(reference_id=[1])
+   :     :              +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3])
+   :     :                 +- Reused(reference_id=[2])
    :     +- Exchange(distribution=[broadcast])
    :        +- Calc(select=[EXPR$0, true AS i])
    :           +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :              +- Exchange(distribution=[hash[EXPR$0]])
    :                 +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
-   :                    +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[3])
    +- Exchange(distribution=[broadcast])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
index a9dfe9d..52e98cc 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
@@ -100,9 +100,10 @@ LogicalFilter(condition=[=($cor1.a, $0)])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], 
isBroadcast=[true], build=[right])
-:- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], 
build=[right])
+:- Exchange(distribution=[hash[a]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
    +- Calc(select=[d])
       +- Correlate(invocation=[table_func($cor0.f)], 
correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], 
rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, 
VARCHAR(2147483647) f0)], joinType=[INNER])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -516,29 +517,29 @@ LogicalFilter(condition=[<>($cor0.b, $1)])
     <Resource name="planAfter">
       <![CDATA[
 NestedLoopJoin(joinType=[LeftAntiJoin], where=[<>(b, e)], select=[a, b, c], 
build=[right])
-:- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS 
NULL(i)), =(c, k))], select=[a, b, c], build=[left])
-:  :- Exchange(distribution=[hash[c]])
-:  :  +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, 
c], build=[right], singleRowJoin=[true])
-:  :     :- HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, 
c], build=[right])
-:  :     :  :- Exchange(distribution=[hash[a]])
-:  :     :  :  +- TableSourceScan(table=[[default_catalog, default_database, 
l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:  :     :  +- Exchange(distribution=[hash[d]])
-:  :     :     +- Calc(select=[d])
-:  :     :        +- TableSourceScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], 
reuse_id=[1])
-:  :     +- Exchange(distribution=[broadcast])
-:  :        +- Calc(select=[IS NOT NULL(m) AS $f0])
-:  :           +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m])
-:  :              +- Exchange(distribution=[single])
-:  :                 +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0])
-:  :                    +- Calc(select=[true AS i])
-:  :                       +- HashAggregate(isMerge=[true], groupBy=[l], 
select=[l])
-:  :                          +- Exchange(distribution=[hash[l]])
-:  :                             +- LocalHashAggregate(groupBy=[l], select=[l])
-:  :                                +- Calc(select=[l], where=[LIKE(n, 
_UTF-16LE'Test')])
-:  :                                   +- 
TableSourceScan(table=[[default_catalog, default_database, t2, source: 
[TestTableSource(l, m, n)]]], fields=[l, m, n])
-:  +- Exchange(distribution=[hash[k]])
-:     +- Calc(select=[i, k], where=[>(i, 10)])
-:        +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+:- NestedLoopJoin(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], 
build=[right], singleRowJoin=[true])
+:  :- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS 
NULL(i)), =(c, k))], select=[a, b, c], build=[right])
+:  :  :- Exchange(distribution=[hash[c]])
+:  :  :  +- HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, 
c], build=[right])
+:  :  :     :- Exchange(distribution=[hash[a]])
+:  :  :     :  +- TableSourceScan(table=[[default_catalog, default_database, 
l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:  :  :     +- Exchange(distribution=[hash[d]])
+:  :  :        +- Calc(select=[d])
+:  :  :           +- TableSourceScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], 
reuse_id=[1])
+:  :  +- Exchange(distribution=[hash[k]])
+:  :     +- Calc(select=[i, k], where=[>(i, 10)])
+:  :        +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+:  +- Exchange(distribution=[broadcast])
+:     +- Calc(select=[IS NOT NULL(m) AS $f0])
+:        +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS m])
+:           +- Exchange(distribution=[single])
+:              +- LocalHashAggregate(select=[Partial_MIN(i) AS min$0])
+:                 +- Calc(select=[true AS i])
+:                    +- HashAggregate(isMerge=[true], groupBy=[l], select=[l])
+:                       +- Exchange(distribution=[hash[l]])
+:                          +- LocalHashAggregate(groupBy=[l], select=[l])
+:                             +- Calc(select=[l], where=[LIKE(n, 
_UTF-16LE'Test')])
+:                                +- TableSourceScan(table=[[default_catalog, 
default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n])
 +- Exchange(distribution=[broadcast])
    +- Calc(select=[e])
       +- Reused(reference_id=[1])
@@ -694,15 +695,14 @@ Calc(select=[b])
    :        :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c])
    :        :        +- Exchange(distribution=[single])
    :        :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0])
-   :        :              +- Calc(select=[1 AS EXPR$0])
-   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], 
reuse_id=[1])
+   :        :              +- Calc(select=[1 AS EXPR$0], reuse_id=[1])
+   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
    :        +- Exchange(distribution=[broadcast])
    :           +- Calc(select=[true AS i])
    :              +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :                 +- Exchange(distribution=[hash[EXPR$0]])
    :                    +- LocalHashAggregate(groupBy=[EXPR$0], 
select=[EXPR$0])
-   :                       +- Calc(select=[1 AS EXPR$0, true AS i])
-   :                          +- Reused(reference_id=[1])
+   :                       +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d, f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -839,9 +839,10 @@ LogicalProject(f1=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashJoin(joinType=[LeftSemiJoin], where=[AND(=(c, f1), =(a, d))], select=[a, 
b, c], isBroadcast=[true], build=[right])
-:- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
+HashJoin(joinType=[LeftSemiJoin], where=[AND(=(c, f1), =(a, d))], select=[a, 
b, c], build=[right])
+:- Exchange(distribution=[hash[c, a]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[f1, d]])
    +- Calc(select=[f0 AS f1, d])
       +- Correlate(invocation=[table_func($cor0.f)], 
correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], 
rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, 
VARCHAR(2147483647) f0)], joinType=[INNER])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -1238,15 +1239,14 @@ Calc(select=[b])
    :        :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c])
    :        :        +- Exchange(distribution=[single])
    :        :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0])
-   :        :              +- Calc(select=[1 AS EXPR$0])
-   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], 
reuse_id=[1])
+   :        :              +- Calc(select=[1 AS EXPR$0], reuse_id=[1])
+   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
    :        +- Exchange(distribution=[broadcast])
    :           +- Calc(select=[true AS i])
    :              +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :                 +- Exchange(distribution=[hash[EXPR$0]])
    :                    +- LocalHashAggregate(groupBy=[EXPR$0], 
select=[EXPR$0])
-   :                       +- Calc(select=[1 AS EXPR$0, true AS i])
-   :                          +- Reused(reference_id=[1])
+   :                       +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d]])
       +- Calc(select=[d])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -1427,9 +1427,10 @@ LogicalProject(f1=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashJoin(joinType=[LeftSemiJoin], where=[=(c, f1)], select=[a, b, c], 
isBroadcast=[true], build=[right])
-:- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
+HashJoin(joinType=[LeftSemiJoin], where=[=(c, f1)], select=[a, b, c], 
build=[right])
+:- Exchange(distribution=[hash[c]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[f1]])
    +- Calc(select=[f0 AS f1])
       +- Correlate(invocation=[table_func($cor0.f)], 
correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], 
rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, 
VARCHAR(2147483647) f0)], joinType=[INNER])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -1927,21 +1928,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 
100))])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], 
build=[left])
-:- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], 
build=[right])
-:  :- Exchange(distribution=[hash[a]])
-:  :  +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], 
build=[right])
-:  :     :- Exchange(distribution=[hash[b]])
-:  :     :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
-:  :     :     +- TableSourceScan(table=[[default_catalog, default_database, 
l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:  :     +- Exchange(distribution=[hash[j]])
-:  :        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
-:  :           +- TableSourceScan(table=[[default_catalog, default_database, 
t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
-:  +- Exchange(distribution=[hash[i]])
-:     +- Calc(select=[i], where=[<(j, 100)])
-:        +- Reused(reference_id=[1])
-+- Exchange(distribution=[hash[d]])
-   +- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], 
build=[right])
+:- Exchange(distribution=[hash[a]])
+:  +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, 
e, f], build=[right])
+:     :- Exchange(distribution=[hash[b]])
+:     :  +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, 
d, e, f], build=[left])
+:     :     :- Exchange(distribution=[hash[a]])
+:     :     :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
+:     :     :     +- TableSourceScan(table=[[default_catalog, 
default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:     :     +- Exchange(distribution=[hash[d]])
+:     :        +- TableSourceScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+:     +- Exchange(distribution=[hash[j]])
+:        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
+:           +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
++- Exchange(distribution=[hash[i]])
+   +- Calc(select=[i], where=[<(j, 100)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -2315,26 +2317,24 @@ Calc(select=[b])
    :        :  :     :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :        :  :     :        +- Exchange(distribution=[single])
    :        :  :     :           +- 
LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS 
count$1])
-   :        :  :     :              +- Calc(select=[i])
-   :        :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+   :        :  :     :              +- Calc(select=[i], reuse_id=[1])
+   :        :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2])
    :        :  :     +- Calc(select=[i, true AS i0])
    :        :  :        +- HashAggregate(isMerge=[true], groupBy=[i], 
select=[i])
    :        :  :           +- Exchange(distribution=[hash[i]])
    :        :  :              +- LocalHashAggregate(groupBy=[i], select=[i])
-   :        :  :                 +- Calc(select=[i, true AS i0])
-   :        :  :                    +- Reused(reference_id=[1])
+   :        :  :                 +- Reused(reference_id=[1])
    :        :  +- Exchange(distribution=[broadcast])
    :        :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :        :        +- Exchange(distribution=[single])
    :        :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0, Partial_COUNT(EXPR$0) AS count$1])
-   :        :              +- Calc(select=[CAST(j) AS EXPR$0])
-   :        :                 +- Reused(reference_id=[1])
+   :        :              +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3])
+   :        :                 +- Reused(reference_id=[2])
    :        +- Calc(select=[EXPR$0, true AS i])
    :           +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :              +- Exchange(distribution=[hash[EXPR$0]])
    :                 +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
-   :                    +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[3])
    +- Exchange(distribution=[hash[f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
index 5100b1c..07ac0fb 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
@@ -82,6 +82,34 @@ HashJoin(joinType=[LeftSemiJoin], where=[AND(=(b, e), =(c, 
f))], select=[a, b, c
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testExistsWithCorrelated_LateralTableInSubQuery">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT * FROM r, LATERAL 
TABLE(table_func(f)) AS T(f1) WHERE a = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[=($cor1.a, $0)])
+  LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
+    LogicalTableScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]])
+    LogicalTableFunctionScan(invocation=[table_func($cor0.f)], 
rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class 
[Ljava.lang.Object;])
+})], variablesSet=[[$cor1]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], 
build=[right])
+:- Exchange(distribution=[hash[a]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
+   +- Calc(select=[d])
+      +- Correlate(invocation=[table_func($cor0.f)], 
correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], 
rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+         +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testExistsWithCorrelated_OverInSubQuery">
     <Resource name="sql">
       <![CDATA[SELECT * FROM l WHERE EXISTS (SELECT MAX(r.e) OVER() FROM r 
WHERE l.c = r.f GROUP BY r.e)]]>
@@ -639,6 +667,63 @@ Calc(select=[c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testInWithUncorrelated_LateralTableInSubQuery">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM l WHERE c IN (SELECT f1 FROM r, LATERAL 
TABLE(table_func(f)) AS T(f1))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(f1=[$3])
+  LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
+    LogicalTableScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]])
+    LogicalTableFunctionScan(invocation=[table_func($cor0.f)], 
rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class 
[Ljava.lang.Object;])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(c, f1)], select=[a, b, c], 
build=[right])
+:- Exchange(distribution=[hash[c]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[f1]])
+   +- Calc(select=[f0 AS f1])
+      +- Correlate(invocation=[table_func($cor0.f)], 
correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], 
rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+         +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInWithCorrelated_LateralTableInSubQuery">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM l WHERE c IN (SELECT f1 FROM r, LATERAL 
TABLE(table_func(f)) AS T(f1) WHERE a = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(f1=[$3])
+  LogicalFilter(condition=[=($cor1.a, $0)])
+    LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
+      LogicalTableScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]])
+      LogicalTableFunctionScan(invocation=[table_func($cor0.f)], 
rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class 
[Ljava.lang.Object;])
+})], variablesSet=[[$cor1]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[AND(=(c, f1), =(a, d))], select=[a, 
b, c], build=[right])
+:- Exchange(distribution=[hash[c, a]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[f1, d]])
+   +- Calc(select=[f0 AS f1, d])
+      +- Correlate(invocation=[table_func($cor0.f)], 
correlate=[table(table_func($cor0.f))], select=[d,e,f,f0], 
rowType=[RecordType(INTEGER d, BIGINT e, VARCHAR(2147483647) f, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+         +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testInWithCorrelated_MultiFields">
     <Resource name="sql">
       <![CDATA[SELECT * FROM l WHERE (a, SUBSTRING(c, 1, 5)) IN (SELECT d, 
SUBSTRING(f, 1, 5) FROM r WHERE l.b = r.e)]]>
@@ -1435,21 +1520,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 
100))])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], 
build=[left])
-:- HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], 
build=[right])
-:  :- Exchange(distribution=[hash[a]])
-:  :  +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], 
build=[right])
-:  :     :- Exchange(distribution=[hash[b]])
-:  :     :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
-:  :     :     +- TableSourceScan(table=[[default_catalog, default_database, 
l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:  :     +- Exchange(distribution=[hash[j]])
-:  :        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
-:  :           +- TableSourceScan(table=[[default_catalog, default_database, 
t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
-:  +- Exchange(distribution=[hash[i]])
-:     +- Calc(select=[i], where=[<(j, 100)])
-:        +- Reused(reference_id=[1])
-+- Exchange(distribution=[hash[d]])
-   +- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], 
build=[right])
+:- Exchange(distribution=[hash[a]])
+:  +- HashJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, 
e, f], build=[right])
+:     :- Exchange(distribution=[hash[b]])
+:     :  +- HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, 
d, e, f], build=[left])
+:     :     :- Exchange(distribution=[hash[a]])
+:     :     :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
+:     :     :     +- TableSourceScan(table=[[default_catalog, 
default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:     :     +- Exchange(distribution=[hash[d]])
+:     :        +- TableSourceScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+:     +- Exchange(distribution=[hash[j]])
+:        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
+:           +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
++- Exchange(distribution=[hash[i]])
+   +- Calc(select=[i], where=[<(j, 100)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -1909,26 +1995,24 @@ Calc(select=[b])
    :        :  :     :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :        :  :     :        +- Exchange(distribution=[single])
    :        :  :     :           +- 
LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS 
count$1])
-   :        :  :     :              +- Calc(select=[i])
-   :        :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+   :        :  :     :              +- Calc(select=[i], reuse_id=[1])
+   :        :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2])
    :        :  :     +- Calc(select=[i, true AS i0])
    :        :  :        +- HashAggregate(isMerge=[true], groupBy=[i], 
select=[i])
    :        :  :           +- Exchange(distribution=[hash[i]])
    :        :  :              +- LocalHashAggregate(groupBy=[i], select=[i])
-   :        :  :                 +- Calc(select=[i, true AS i0])
-   :        :  :                    +- Reused(reference_id=[1])
+   :        :  :                 +- Reused(reference_id=[1])
    :        :  +- Exchange(distribution=[broadcast])
    :        :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :        :        +- Exchange(distribution=[single])
    :        :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0, Partial_COUNT(EXPR$0) AS count$1])
-   :        :              +- Calc(select=[CAST(j) AS EXPR$0])
-   :        :                 +- Reused(reference_id=[1])
+   :        :              +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3])
+   :        :                 +- Reused(reference_id=[2])
    :        +- Calc(select=[EXPR$0, true AS i])
    :           +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :              +- Exchange(distribution=[hash[EXPR$0]])
    :                 +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
-   :                    +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[3])
    +- Exchange(distribution=[hash[f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
index 54bb720..b9c22aa 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
@@ -1520,21 +1520,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 
100))])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortMergeJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f])
-:- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c])
-:  :- Exchange(distribution=[hash[a]])
-:  :  +- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, 
c])
-:  :     :- Exchange(distribution=[hash[b]])
-:  :     :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
-:  :     :     +- TableSourceScan(table=[[default_catalog, default_database, 
l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:  :     +- Exchange(distribution=[hash[j]])
-:  :        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
-:  :           +- TableSourceScan(table=[[default_catalog, default_database, 
t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
-:  +- Exchange(distribution=[hash[i]])
-:     +- Calc(select=[i], where=[<(j, 100)])
-:        +- Reused(reference_id=[1])
-+- Exchange(distribution=[hash[d]])
-   +- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+SortMergeJoin(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, 
f])
+:- Exchange(distribution=[hash[a]])
+:  +- SortMergeJoin(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, 
d, e, f])
+:     :- Exchange(distribution=[hash[b]])
+:     :  +- SortMergeJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, 
c, d, e, f])
+:     :     :- Exchange(distribution=[hash[a]])
+:     :     :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
+:     :     :     +- TableSourceScan(table=[[default_catalog, 
default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:     :     +- Exchange(distribution=[hash[d]])
+:     :        +- TableSourceScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+:     +- Exchange(distribution=[hash[j]])
+:        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
+:           +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
++- Exchange(distribution=[hash[i]])
+   +- Calc(select=[i], where=[<(j, 100)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -1994,26 +1995,24 @@ Calc(select=[b])
    :        :  :     :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :        :  :     :        +- Exchange(distribution=[single])
    :        :  :     :           +- 
LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_COUNT(i) AS 
count$1])
-   :        :  :     :              +- Calc(select=[i])
-   :        :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+   :        :  :     :              +- Calc(select=[i], reuse_id=[1])
+   :        :  :     :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2])
    :        :  :     +- Calc(select=[i, true AS i0])
    :        :  :        +- HashAggregate(isMerge=[true], groupBy=[i], 
select=[i])
    :        :  :           +- Exchange(distribution=[hash[i]])
    :        :  :              +- LocalHashAggregate(groupBy=[i], select=[i])
-   :        :  :                 +- Calc(select=[i, true AS i0])
-   :        :  :                    +- Reused(reference_id=[1])
+   :        :  :                 +- Reused(reference_id=[1])
    :        :  +- Exchange(distribution=[broadcast])
    :        :     +- SortAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
    :        :        +- Exchange(distribution=[single])
    :        :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS 
count1$0, Partial_COUNT(EXPR$0) AS count$1])
-   :        :              +- Calc(select=[CAST(j) AS EXPR$0])
-   :        :                 +- Reused(reference_id=[1])
+   :        :              +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3])
+   :        :                 +- Reused(reference_id=[2])
    :        +- Calc(select=[EXPR$0, true AS i])
    :           +- HashAggregate(isMerge=[true], groupBy=[EXPR$0], 
select=[EXPR$0])
    :              +- Exchange(distribution=[hash[EXPR$0]])
    :                 +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
-   :                    +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[3])
    +- Exchange(distribution=[hash[f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
index ce66d1f..919e964 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
@@ -32,7 +32,7 @@ LogicalProject(a=[$0], d=[$3])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], d=[$3])
-+- LogicalJoin(condition=[AND(OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 
2)), AND(=($0, 2), =($3, 1))), OR(=($0, 0), =($0, 1), =($0, 2)))], 
joinType=[inner])
++- LogicalJoin(condition=[AND(OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 
2)), AND(=($0, 2), =($3, 1))), OR(=(0, $0), =(1, $0), =(2, $0)))], 
joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
@@ -54,7 +54,7 @@ LogicalProject(a=[$0], d=[$3])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], d=[$3])
-+- LogicalJoin(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 
2), =($3, 1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], 
joinType=[inner])
++- LogicalJoin(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 
2), =($3, 1))), OR(=(1, $0), =(2, $0)), OR(=(2, $3), =(1, $3)))], 
joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
@@ -98,7 +98,7 @@ LogicalProject(a=[$0], d=[$3])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], d=[$3])
-+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), 
AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=($0, 1), =($1, 1)), 
AND(=($0, 2), =($1, 2))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 
1))))], joinType=[inner])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), 
AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=(1, $0), =(1, $1)), 
AND(=(2, $0), =(2, $1))), OR(AND(=(2, $3), =(2, $4)), AND(=(1, $3), =(1, 
$4))))], joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
@@ -120,7 +120,7 @@ LogicalProject(a=[$0], d=[$3])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], d=[$3])
-+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 
1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))), OR(=($0, 1), =($0, 
2)), OR(=($3, 2), =($3, 1)), OR(=($0, 3), =($0, 4)), OR(=($3, 4), =($3, 3)))], 
joinType=[inner])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 
1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))), OR(=(1, $0), =(2, 
$0)), OR(=(2, $3), =(1, $3)), OR(=(3, $0), =(4, $0)), OR(=(4, $3), =(3, $3)))], 
joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
@@ -152,7 +152,7 @@ LogicalProject(a=[$0], d=[$6])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], d=[$6])
-+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), 
AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=($0, 1), =($1, 1)), =($0, 
2)), OR(AND(=($3, 2), =($7, 2)), AND(=($4, 2), =($6, 1), =($7, 1))))], 
joinType=[inner])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), 
AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=(1, $0), =(1, $1)), =(2, 
$0)), OR(AND(=(2, $3), =(2, $7)), AND(=(2, $4), =(1, $6), =(1, $7))))], 
joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], 
h=[$7])
       +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
@@ -177,7 +177,7 @@ LogicalProject(a=[$0], d=[$3])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], d=[$3])
-+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), 
AND(=($3, 1), =($4, 1))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 
1))))], joinType=[inner])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), 
AND(=($3, 1), =($4, 1))), OR(AND(=(2, $3), =(2, $4)), AND(=(1, $3), =(1, 
$4))))], joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
@@ -199,7 +199,7 @@ LogicalProject(a=[$0], d=[$3])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], d=[$3])
-+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 
1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], joinType=[inner])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 
1))), OR(=(1, $0), =(2, $0)), OR(=(2, $3), =(1, $3)))], joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
index 19dfb77..1ec0d26 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
@@ -156,7 +156,7 @@ LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[100])
          +- LogicalJoin(condition=[=($7, $1)], joinType=[semi])
             :- LogicalTableScan(table=[[default_catalog, default_database, 
item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, 
i_category, i_color, i_units, i_size)]]])
             +- LogicalProject(i_manufact=[$1])
-               +- LogicalFilter(condition=[OR(AND(=($3, _UTF-16LE'Women'), 
OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, 
_UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, 
_UTF-16LE'extra large'))), AND(=($3, _UTF-16LE'Women'), OR(=($4, 
_UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), 
=($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), 
AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF-16LE'floral'), =( [...]
+               +- LogicalFilter(condition=[OR(AND(=($3, _UTF-16LE'Women'), 
OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, 
_UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, 
_UTF-16LE'extra large'))), AND(=(_UTF-16LE'Women', $3), OR(=($4, 
_UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), 
=($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), 
AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF-16LE'floral'), =( [...]
                   +- LogicalTableScan(table=[[default_catalog, 
default_database, item, source: [TestTableSource(i_manufact_id, i_manufact, 
i_product_name, i_category, i_color, i_units, i_size)]]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml
index 7736dea..856ce59 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQueryAntiJoinTest.xml
@@ -46,22 +46,22 @@ LogicalFilter(condition=[<>($cor0.b, $1)])
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
 +- LogicalJoin(condition=[<>($1, $3)], joinType=[anti])
-   :- LogicalJoin(condition=[AND(OR(=($1, $3), IS NULL($1), IS NULL($3)), 
=($2, $4))], joinType=[anti])
-   :  :- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalJoin(condition=[$3], joinType=[semi])
+   :  :- LogicalJoin(condition=[AND(OR(=($1, $3), IS NULL($1), IS NULL($3)), 
=($2, $4))], joinType=[anti])
    :  :  :- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
    :  :  :  +- LogicalProject(d=[$0])
    :  :  :     +- LogicalTableScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]])
-   :  :  +- LogicalProject($f0=[IS NOT NULL($0)])
-   :  :     +- LogicalAggregate(group=[{}], m=[MIN($0)])
-   :  :        +- LogicalProject(i=[true])
-   :  :           +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT($0)])
-   :  :              +- LogicalProject(l=[$0])
-   :  :                 +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'Test')])
-   :  :                    +- LogicalTableScan(table=[[default_catalog, 
default_database, t2, source: [TestTableSource(l, m, n)]]])
-   :  +- LogicalProject(i=[$0], k=[$2])
-   :     +- LogicalFilter(condition=[>($0, 10)])
-   :        +- LogicalTableScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]])
+   :  :  +- LogicalProject(i=[$0], k=[$2])
+   :  :     +- LogicalFilter(condition=[>($0, 10)])
+   :  :        +- LogicalTableScan(table=[[default_catalog, default_database, 
t, source: [TestTableSource(i, j, k)]]])
+   :  +- LogicalProject($f0=[IS NOT NULL($0)])
+   :     +- LogicalAggregate(group=[{}], m=[MIN($0)])
+   :        +- LogicalProject(i=[true])
+   :           +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT($0)])
+   :              +- LogicalProject(l=[$0])
+   :                 +- LogicalFilter(condition=[LIKE($2, _UTF-16LE'Test')])
+   :                    +- LogicalTableScan(table=[[default_catalog, 
default_database, t2, source: [TestTableSource(l, m, n)]]])
    +- LogicalProject(e=[$1])
       +- LogicalFilter(condition=[true])
          +- LogicalTableScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 390832a..5c176e5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -234,7 +234,7 @@ LogicalProject(a=[$0], EXPR$1=[$2], c=[$1])
 Calc(select=[a, EXPR$1, _UTF-16LE'test' AS c])
 +- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1])
    +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, _UTF-16LE'test' AS c, b])
+      +- Calc(select=[a, b])
          +- TableSourceScan(table=[[default_catalog, default_database, T, 
source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
index ddae158..55029e6 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/TwoStageAggregateTest.xml
@@ -175,7 +175,7 @@ Calc(select=[4 AS four, EXPR$1])
 +- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$1])
    +- Exchange(distribution=[hash[b]])
       +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
-         +- Calc(select=[b, 4 AS four, a])
+         +- Calc(select=[b, a])
             +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
index 7ef21d9..c2dfa03 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -516,28 +516,28 @@ LogicalFilter(condition=[<>($cor0.b, $1)])
       <![CDATA[
 Join(joinType=[LeftAntiJoin], where=[<>(b, e)], select=[a, b, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :- Exchange(distribution=[single])
-:  +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS 
NULL(i)), =(c, k))], select=[a, b, c], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
-:     :- Exchange(distribution=[hash[c]])
-:     :  +- Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-:     :     :- Exchange(distribution=[single])
+:  +- Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :- Exchange(distribution=[single])
+:     :  +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), 
IS NULL(i)), =(c, k))], select=[a, b, c], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+:     :     :- Exchange(distribution=[hash[c]])
 :     :     :  +- Join(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, 
c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :     :     :     :- Exchange(distribution=[hash[a]])
 :     :     :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 :     :     :     +- Exchange(distribution=[hash[d]])
 :     :     :        +- Calc(select=[d])
 :     :     :           +- TableSourceScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f], 
reuse_id=[1])
-:     :     +- Exchange(distribution=[single])
-:     :        +- Calc(select=[IS NOT NULL(m) AS $f0])
-:     :           +- GroupAggregate(select=[MIN(i) AS m])
-:     :              +- Exchange(distribution=[single])
-:     :                 +- Calc(select=[true AS i])
-:     :                    +- GroupAggregate(groupBy=[l], select=[l])
-:     :                       +- Exchange(distribution=[hash[l]])
-:     :                          +- Calc(select=[l], where=[LIKE(n, 
_UTF-16LE'Test')])
-:     :                             +- 
TableSourceScan(table=[[default_catalog, default_database, t2, source: 
[TestTableSource(l, m, n)]]], fields=[l, m, n])
-:     +- Exchange(distribution=[hash[k]])
-:        +- Calc(select=[i, k], where=[>(i, 10)])
-:           +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+:     :     +- Exchange(distribution=[hash[k]])
+:     :        +- Calc(select=[i, k], where=[>(i, 10)])
+:     :           +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
+:     +- Exchange(distribution=[single])
+:        +- Calc(select=[IS NOT NULL(m) AS $f0])
+:           +- GroupAggregate(select=[MIN(i) AS m])
+:              +- Exchange(distribution=[single])
+:                 +- Calc(select=[true AS i])
+:                    +- GroupAggregate(groupBy=[l], select=[l])
+:                       +- Exchange(distribution=[hash[l]])
+:                          +- Calc(select=[l], where=[LIKE(n, 
_UTF-16LE'Test')])
+:                             +- TableSourceScan(table=[[default_catalog, 
default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n])
 +- Exchange(distribution=[single])
    +- Calc(select=[e])
       +- Reused(reference_id=[1])
@@ -692,14 +692,13 @@ Calc(select=[b])
    :        :     +- Exchange(distribution=[single])
    :        :        +- GroupAggregate(select=[COUNT(*) AS c])
    :        :           +- Exchange(distribution=[single])
-   :        :              +- Calc(select=[1 AS EXPR$0])
-   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], 
reuse_id=[1])
+   :        :              +- Calc(select=[1 AS EXPR$0], reuse_id=[1])
+   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
    :        +- Exchange(distribution=[single])
    :           +- Calc(select=[true AS i])
    :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
    :                 +- Exchange(distribution=[hash[EXPR$0]])
-   :                    +- Calc(select=[1 AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d, f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -1235,14 +1234,13 @@ Calc(select=[b])
    :        :     +- Exchange(distribution=[single])
    :        :        +- GroupAggregate(select=[COUNT(*) AS c])
    :        :           +- Exchange(distribution=[single])
-   :        :              +- Calc(select=[1 AS EXPR$0])
-   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], 
reuse_id=[1])
+   :        :              +- Calc(select=[1 AS EXPR$0], reuse_id=[1])
+   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])
    :        +- Exchange(distribution=[single])
    :           +- Calc(select=[true AS i])
    :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
    :                 +- Exchange(distribution=[hash[EXPR$0]])
-   :                    +- Calc(select=[1 AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d]])
       +- Calc(select=[d])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
@@ -1923,22 +1921,22 @@ LogicalFilter(condition=[AND(=($cor0.a, $0), <($1, 
100))])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+Join(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c, d, e, f], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :- Exchange(distribution=[hash[a]])
-:  +- Join(joinType=[LeftAntiJoin], where=[=(a, i)], select=[a, b, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-:     :- Exchange(distribution=[hash[a]])
-:     :  +- Join(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-:     :     :- Exchange(distribution=[hash[b]])
+:  +- Join(joinType=[LeftAntiJoin], where=[=(b, j)], select=[a, b, c, d, e, 
f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :- Exchange(distribution=[hash[b]])
+:     :  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, 
f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:     :     :- Exchange(distribution=[hash[a]])
 :     :     :  +- Calc(select=[a, b, c], where=[>=(CAST(c), 1:BIGINT)])
 :     :     :     +- TableSourceScan(table=[[default_catalog, 
default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-:     :     +- Exchange(distribution=[hash[j]])
-:     :        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
-:     :           +- TableSourceScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k], 
reuse_id=[1])
-:     +- Exchange(distribution=[hash[i]])
-:        +- Calc(select=[i], where=[<(j, 100)])
-:           +- Reused(reference_id=[1])
-+- Exchange(distribution=[hash[d]])
-   +- TableSourceScan(table=[[default_catalog, default_database, r, source: 
[TestTableSource(d, e, f)]]], fields=[d, e, f])
+:     :     +- Exchange(distribution=[hash[d]])
+:     :        +- TableSourceScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+:     +- Exchange(distribution=[hash[j]])
+:        +- Calc(select=[j], where=[>(CAST(k), 50:BIGINT)])
+:           +- TableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
++- Exchange(distribution=[hash[i]])
+   +- Calc(select=[i], where=[<(j, 100)])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -2287,25 +2285,23 @@ Calc(select=[b])
    :        :     :        :     +- Exchange(distribution=[single])
    :        :     :        :        +- GroupAggregate(select=[COUNT(*) AS c, 
COUNT(i) AS ck])
    :        :     :        :           +- Exchange(distribution=[single])
-   :        :     :        :              +- Calc(select=[i])
-   :        :     :        :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[1])
+   :        :     :        :              +- Calc(select=[i], reuse_id=[1])
+   :        :     :        :                 +- 
TableSourceScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]], fields=[i, j, k], reuse_id=[2])
    :        :     :        +- Exchange(distribution=[hash[i]])
    :        :     :           +- Calc(select=[i, true AS i0])
    :        :     :              +- GroupAggregate(groupBy=[i], select=[i])
    :        :     :                 +- Exchange(distribution=[hash[i]])
-   :        :     :                    +- Calc(select=[i, true AS i0])
-   :        :     :                       +- Reused(reference_id=[1])
+   :        :     :                    +- Reused(reference_id=[1])
    :        :     +- Exchange(distribution=[single])
    :        :        +- GroupAggregate(select=[COUNT(*) AS c, COUNT(EXPR$0) AS 
ck])
    :        :           +- Exchange(distribution=[single])
-   :        :              +- Calc(select=[CAST(j) AS EXPR$0])
-   :        :                 +- Reused(reference_id=[1])
+   :        :              +- Calc(select=[CAST(j) AS EXPR$0], reuse_id=[3])
+   :        :                 +- Reused(reference_id=[2])
    :        +- Exchange(distribution=[hash[EXPR$0]])
    :           +- Calc(select=[EXPR$0, true AS i])
    :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
    :                 +- Exchange(distribution=[hash[EXPR$0]])
-   :                    +- Calc(select=[CAST(j) AS EXPR$0, true AS i])
-   :                       +- Reused(reference_id=[1])
+   :                    +- Reused(reference_id=[3])
    +- Exchange(distribution=[hash[f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
index fd6047b..e4218f0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/AggregateTest.xml
@@ -130,7 +130,7 @@ LogicalProject(four=[$1], EXPR$0=[$2])
 Calc(select=[4 AS four, EXPR$0])
 +- GroupAggregate(groupBy=[a], select=[a, SUM(b) AS EXPR$0])
    +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, 4 AS four, b])
+      +- Calc(select=[a, b])
          +- TableSourceScan(table=[[default_catalog, default_database, Table1, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -149,7 +149,7 @@ LogicalProject(four=[$1], EXPR$0=[$2])
 Calc(select=[4 AS four, EXPR$0])
 +- GroupAggregate(groupBy=[b], select=[b, SUM(a) AS EXPR$0])
    +- Exchange(distribution=[hash[b]])
-      +- Calc(select=[b, 4 AS four, a])
+      +- Calc(select=[b, a])
          +- TableSourceScan(table=[[default_catalog, default_database, Table1, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
index 8b59588..9bcdc6b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TwoStageAggregateTest.xml
@@ -91,7 +91,7 @@ Calc(select=[4 AS four, EXPR$0])
 +- GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) AS EXPR$0])
    +- Exchange(distribution=[hash[a]])
       +- LocalGroupAggregate(groupBy=[a], select=[a, SUM(b) AS sum$0])
-         +- Calc(select=[a, 4 AS four, b])
+         +- Calc(select=[a, b])
             +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                +- TableSourceScan(table=[[default_catalog, default_database, 
Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -112,7 +112,7 @@ Calc(select=[4 AS four, EXPR$0])
 +- GlobalGroupAggregate(groupBy=[b], select=[b, SUM(sum$0) AS EXPR$0])
    +- Exchange(distribution=[hash[b]])
       +- LocalGroupAggregate(groupBy=[b], select=[b, SUM(a) AS sum$0])
-         +- Calc(select=[b, 4 AS four, a])
+         +- Calc(select=[b, a])
             +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                +- TableSourceScan(table=[[default_catalog, default_database, 
Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
index 8848881..d62a84d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
@@ -704,13 +704,13 @@ class DecimalTypeTest extends ExpressionTestBase {
       'f42 % 'f41,
       "f42 % f41",
       "mod(f42, f41)",
-      "2.00")
+      "2.0000")
 
     testAllApis(
       'f41 % 'f43,
       "f41 % f43",
       "mod(f41, f43)",
-      "3")
+      "3.00")
 
     testAllApis(
       'f43 % 'f41,
@@ -749,7 +749,7 @@ class DecimalTypeTest extends ExpressionTestBase {
       'f46 % 'f47,
       "f46 % f47",
       "mod(f46, f47)",
-      "3.12")
+      "3.1234")
   }
 
   @Test  // functions that treat Decimal as exact value
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
index 09d4916..f4814c4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.scala
@@ -162,25 +162,4 @@ class ShuffledHashSemiAntiJoinTest extends 
SemiAntiJoinTestBase {
     super.testNotInWithUncorrelated_SimpleCondition3()
   }
 
-  @Test
-  override def testExistsWithCorrelated_LateralTableInSubQuery(): Unit = {
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Cannot generate a valid execution plan for the given 
query")
-    super.testExistsWithCorrelated_LateralTableInSubQuery()
-  }
-
-  @Test
-  override def testInWithUncorrelated_LateralTableInSubQuery(): Unit = {
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Cannot generate a valid execution plan for the given 
query")
-    super.testInWithUncorrelated_LateralTableInSubQuery()
-  }
-
-  @Test
-  override def testInWithCorrelated_LateralTableInSubQuery(): Unit = {
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Cannot generate a valid execution plan for the given 
query")
-    super.testInWithCorrelated_LateralTableInSubQuery()
-  }
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
index 29ee98d..12e2ee8 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
@@ -43,7 +43,7 @@ class FlinkRexUtilTest {
     val i_size = rexBuilder.makeInputRef(varcharType, 4)
 
     // this predicate contains 95 RexCalls. however,
-    // if this predicate is converted to CNF, the result contains 736450 
RexCalls.
+    // if this predicate is converted to CNF, the result contains 557715 
RexCalls.
     val predicate = rexBuilder.makeCall(OR,
       rexBuilder.makeCall(AND,
         rexBuilder.makeCall(EQUALS, i_manufact, 
rexBuilder.makeLiteral("able")),
@@ -181,10 +181,10 @@ class FlinkRexUtilTest {
     val newPredicate1 = FlinkRexUtil.toCnf(rexBuilder, -1, predicate)
     assertEquals(predicate.toString, newPredicate1.toString)
 
-    val newPredicate2 = FlinkRexUtil.toCnf(rexBuilder, 736449, predicate)
+    val newPredicate2 = FlinkRexUtil.toCnf(rexBuilder, 557714, predicate)
     assertEquals(predicate.toString, newPredicate2.toString)
 
-    val newPredicate3 = FlinkRexUtil.toCnf(rexBuilder, 736450, predicate)
+    val newPredicate3 = FlinkRexUtil.toCnf(rexBuilder, 557715, predicate)
     assertEquals(RexUtil.toCnf(rexBuilder, predicate).toString, 
newPredicate3.toString)
 
     val newPredicate4 = FlinkRexUtil.toCnf(rexBuilder, Int.MaxValue, predicate)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
index cbb3c5d..dfcced1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
@@ -28,10 +28,15 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
  * A bridge between a Flink's specific {@link QueryOperationCatalogView} and a 
Calcite's
  * {@link org.apache.calcite.schema.Table}. It implements {@link 
TranslatableTable} interface. This enables
@@ -49,7 +54,30 @@ public class QueryOperationCatalogViewTable extends 
AbstractTable implements Tra
        public static QueryOperationCatalogViewTable 
createCalciteTable(QueryOperationCatalogView catalogView) {
                return new QueryOperationCatalogViewTable(catalogView, 
typeFactory -> {
                        TableSchema tableSchema = catalogView.getSchema();
-                       return ((FlinkTypeFactory) 
typeFactory).buildLogicalRowType(tableSchema);
+                       final FlinkTypeFactory flinkTypeFactory = 
(FlinkTypeFactory) typeFactory;
+                       final RelDataType relType = 
flinkTypeFactory.buildLogicalRowType(tableSchema);
+                       Boolean[] nullables = tableSchema
+                               .getTableColumns()
+                               .stream()
+                               .map(c -> 
c.getType().getLogicalType().isNullable())
+                               .toArray(Boolean[]::new);
+                       final List<RelDataTypeField> fields = relType
+                               .getFieldList()
+                               .stream()
+                               .map(f -> {
+                                       boolean nullable = 
nullables[f.getIndex()];
+                                       if (nullable != f.getType().isNullable()
+                                               && 
!FlinkTypeFactory.isTimeIndicatorType(f.getType())) {
+                                               return new RelDataTypeFieldImpl(
+                                                       f.getName(),
+                                                       f.getIndex(),
+                                                       
flinkTypeFactory.createTypeWithNullability(f.getType(), nullable));
+                                       } else {
+                                               return f;
+                                       }
+                               })
+                               .collect(Collectors.toList());
+                       return flinkTypeFactory.createStructType(fields);
                });
        }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java
index c30d741..d5e581a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/PlannerQueryOperation.java
@@ -19,9 +19,10 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
@@ -43,11 +44,22 @@ public class PlannerQueryOperation implements 
QueryOperation {
 
                RelDataType rowType = calciteTree.getRowType();
                String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
-               TypeInformation[] fieldTypes = rowType.getFieldList()
+               DataType[] fieldTypes = rowType.getFieldList()
                        .stream()
-                       .map(field -> 
FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new);
+                       .map(field -> {
+                               final DataType fieldType = TypeConversions
+                                       
.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(field.getType()));
+                               final boolean nullable = 
field.getType().isNullable();
+                               if (nullable != 
fieldType.getLogicalType().isNullable()
+                                       && 
!FlinkTypeFactory.isTimeIndicatorType(field.getType())) {
+                                       return nullable ? fieldType.nullable() 
: fieldType.notNull();
+                               } else {
+                                       return fieldType;
+                               }
+                       })
+                       .toArray(DataType[]::new);
 
-               this.tableSchema = new TableSchema(fieldNames, fieldTypes);
+               this.tableSchema = TableSchema.builder().fields(fieldNames, 
fieldTypes).build();
        }
 
        public RelNode getCalciteTree() {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index 77a5a83..74f71c1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -376,11 +376,11 @@ class GroupWindowTest extends TableTestBase {
           unaryNode(
             "DataSetCalc",
             batchTableNode(table),
-            term("select", "CASE(=(a, 1), 1, 99) AS correct, rowtime")
+            term("select", "rowtime, CASE(=(a, 1), 1, 99) AS $f1")
           ),
           term("window", "TumblingGroupWindow('w$, 'rowtime, 900000.millis)"),
-          term("select", "SUM(correct) AS s, AVG(correct) AS a, start('w$) AS 
w$start," +
-            " end('w$) AS w$end, rowtime('w$) AS w$rowtime")
+          term("select", "SUM($f1) AS s, AVG($f1) AS a, start('w$) AS w$start,"
+            + " end('w$) AS w$end, rowtime('w$) AS w$rowtime")
         ),
         term("select", "CAST(s) AS s", "CAST(a) AS a", "CAST(w$start) AS 
wStart")
       )
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
index 1b3daaa..056c882 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
@@ -64,16 +64,16 @@ class SetOperatorsTest extends TableTestBase {
             unaryNode(
               "DataSetCalc",
               batchTableNode(table1),
-              term("select", "b_long AS b_long3", "true AS $f0"),
+              term("select", "b_long", "true AS $f0"),
               term("where", "IS NOT NULL(b_long)")
             ),
-            term("groupBy", "b_long3"),
-            term("select", "b_long3", "MIN($f0) AS $f1")
+            term("groupBy", "b_long"),
+            term("select", "b_long", "MIN($f0) AS $f1")
           ),
-          term("select", "b_long3")
+          term("select", "b_long")
         ),
-        term("where", "=(a_long, b_long3)"),
-        term("join", "a_long", "a_int", "a_string", "b_long3"),
+        term("where", "=(a_long, b_long)"),
+        term("join", "a_long", "a_int", "a_string", "b_long"),
         term("joinType", "InnerJoin")
       ),
       term("select", "a_int", "a_string")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index c501390..ede7541 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -351,7 +351,7 @@ class CalcTest extends TableTestBase {
           term("groupBy", "word"),
           term("select", "word", "SUM(frequency) AS EXPR$0")
         ),
-        term("select", "word, EXPR$0 AS frequency"),
+        term("select", "word, EXPR$0"),
         term("where", "=(EXPR$0, 2)")
       )
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index c7c4aeb..733470d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -244,9 +244,9 @@ class GroupWindowTest extends TableTestBase {
                 "rowtime('w$) AS w$rowtime",
                 "proctime('w$) AS w$proctime")
             ),
-            term("select", "w$rowtime AS zzzzz")
+            term("select", "w$rowtime AS $f2")
           ),
-          term("window", "TumblingGroupWindow('w$, 'zzzzz, 4.millis)"),
+          term("window", "TumblingGroupWindow('w$, '$f2, 4.millis)"),
           term("select",
             "COUNT(*) AS a",
             "start('w$) AS w$start",
@@ -329,12 +329,12 @@ class GroupWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(table),
-            term("select", "CASE(=(a, 1), 1, 99) AS correct", "rowtime")
+            term("select", "rowtime", "CASE(=(a, 1), 1, 99) AS $f1")
           ),
           term("window", "TumblingGroupWindow('w$, 'rowtime, 900000.millis)"),
           term("select",
-            "SUM(correct) AS s",
-            "AVG(correct) AS a",
+            "SUM($f1) AS s",
+            "AVG($f1) AS a",
             "start('w$) AS w$start",
             "end('w$) AS w$end",
             "rowtime('w$) AS w$rowtime",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 89ad599..dcc86ab 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -290,7 +290,7 @@ class JoinTest extends TableTestBase {
           ),
           unaryNode("DataStreamCalc",
             streamTableNode(t2),
-            term("select", "a", "c", "proctime", "CAST(12:BIGINT) AS 
nullField")
+            term("select", "a", "c", "proctime", "12:BIGINT AS nullField")
           ),
           term("where", "AND(=(a, a0), =(nullField, nullField0), 
>=(PROCTIME(proctime), " +
             "-(PROCTIME(proctime0), 5000:INTERVAL SECOND)), 
<=(PROCTIME(proctime), " +
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
index 05874bc..cad41e9 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
@@ -139,15 +139,11 @@ class TemporalTableJoinTest extends TableTestBase {
         "DataStreamCalc",
         binaryNode(
           "DataStreamTemporalTableJoin",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(orders),
-            term("select", "o_rowtime, o_amount, o_currency, o_secondary_key")
-          ),
+          streamTableNode(orders),
           unaryNode(
             "DataStreamCalc",
             streamTableNode(ratesHistory),
-            term("select", "rowtime, currency, rate, secondary_key"),
+            term("select", "rowtime, comment, currency, rate, secondary_key"),
             term("where", ">(rate, 110:BIGINT)")
           ),
           term(
@@ -158,10 +154,12 @@ class TemporalTableJoinTest extends TableTestBase {
           term(
             "join",
             "o_rowtime",
+            "o_comment",
             "o_amount",
             "o_currency",
             "o_secondary_key",
             "rowtime",
+            "comment",
             "currency",
             "rate",
             "secondary_key"),
@@ -224,15 +222,12 @@ class TemporalTableJoinTest extends TableTestBase {
       binaryNode(
         "DataStreamTemporalTableJoin",
         streamTableNode(proctimeOrders),
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(proctimeRatesHistory),
-          term("select", "currency, rate")),
+        streamTableNode(proctimeRatesHistory),
         term("where",
           "AND(" +
             s"${TEMPORAL_JOIN_CONDITION.getName}(o_proctime, currency), " +
             "=(currency, o_currency))"),
-        term("join", "o_amount", "o_currency", "o_proctime", "currency", 
"rate"),
+        term("join", "o_amount", "o_currency", "o_proctime", "currency", 
"rate", "proctime"),
         term("joinType", "InnerJoin")
       ),
       term("select", "*(o_amount, rate) AS rate")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
index 5e69382..0c0f096 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
@@ -111,15 +111,11 @@ class TemporalTableJoinTest extends TableTestBase {
         "DataStreamCalc",
         binaryNode(
           "DataStreamTemporalTableJoin",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(orders),
-            term("select", "o_rowtime, o_amount, o_currency, o_secondary_key")
-          ),
+          streamTableNode(orders),
           unaryNode(
             "DataStreamCalc",
             streamTableNode(ratesHistory),
-            term("select", "rowtime, currency, rate, secondary_key"),
+            term("select", "rowtime, comment, currency, rate, secondary_key"),
             term("where", ">(rate, 110:BIGINT)")
           ),
           term(
@@ -130,10 +126,12 @@ class TemporalTableJoinTest extends TableTestBase {
           term(
             "join",
             "o_rowtime",
+            "o_comment",
             "o_amount",
             "o_currency",
             "o_secondary_key",
             "rowtime",
+            "comment",
             "currency",
             "rate",
             "secondary_key"),
@@ -240,15 +238,12 @@ class TemporalTableJoinTest extends TableTestBase {
       binaryNode(
         "DataStreamTemporalTableJoin",
         streamTableNode(proctimeOrders),
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(proctimeRatesHistory),
-          term("select", "currency, rate")),
+        streamTableNode(proctimeRatesHistory),
         term("where",
           "AND(" +
             s"${TEMPORAL_JOIN_CONDITION.getName}(o_proctime, currency), " +
             "=(currency, o_currency))"),
-        term("join", "o_amount", "o_currency", "o_proctime", "currency", 
"rate"),
+        term("join", "o_amount", "o_currency", "o_proctime", "currency", 
"rate", "proctime"),
         term("joinType", "InnerJoin")
       ),
       term("select", "*(o_amount, rate) AS rate")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index 2e66dc8..1a83546 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -423,8 +423,8 @@ class RexProgramExtractorTest extends RexProgramTestBase {
     )
     assertExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(2, unconvertedRexNodes.length)
-    assertEquals(">(CAST($2):BIGINT NOT NULL, 100)", 
unconvertedRexNodes(0).toString)
-    assertEquals("OR(>(CAST($2):BIGINT NOT NULL, 100), <=($2, $1))",
+    assertEquals("<(100, CAST($2):BIGINT NOT NULL)", 
unconvertedRexNodes(0).toString)
+    assertEquals("OR(>=($1, $2), <(100, CAST($2):BIGINT NOT NULL))",
       unconvertedRexNodes(1).toString)
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
index 1ef95b0..40fce87 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testSqlUpdateAndToDataStream.out
@@ -1,6 +1,6 @@
 == Abstract Syntax Tree ==
 LogicalProject(first=[$0])
-  EnumerableTableScan(table=[[default_catalog, default_database, MyTable]])
+  LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 
 == Optimized Logical Plan ==
 StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[first], source=[CsvTableSource(read fields: first)])

Reply via email to