This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 33278628dc5 [FLINK-31273][table-planner] Fix left join with IS_NULL 
filter be wrongly pushed down and get wrong join results
33278628dc5 is described below

commit 33278628dc599bed8944733efb9495ce77993d4b
Author: zhengyunhong.zyh <[email protected]>
AuthorDate: Wed Mar 1 15:28:50 2023 +0800

    [FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly 
pushed down and get wrong join results
    
    This closes #22049
    
    (cherry picked from commit 8990822bd77d70f3249e1220a853e16dadd8ef54)
---
 .../plan/rules/logical/FlinkFilterJoinRule.java    |  44 +-
 .../rules/logical/FlinkFilterJoinRuleTest.java     | 102 +++++
 .../plan/batch/sql/join/BroadcastHashJoinTest.xml  |  69 ++++
 .../plan/batch/sql/join/NestedLoopJoinTest.xml     | 214 +++++++---
 .../plan/batch/sql/join/ShuffledHashJoinTest.xml   | 335 +++++++++------
 .../plan/batch/sql/join/SortMergeJoinTest.xml      |  71 ++++
 .../plan/rules/logical/FlinkFilterJoinRuleTest.xml | 456 ++++++++++++++++++++-
 .../planner/plan/batch/sql/join/JoinTestBase.scala |  28 +-
 .../plan/batch/sql/join/NestedLoopJoinTest.scala   |  10 +-
 .../runtime/batch/sql/join/JoinITCase.scala        | 108 +++++
 .../planner/runtime/stream/sql/JoinITCase.scala    |  69 ++++
 11 files changed, 1295 insertions(+), 211 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
index 47d7a44f69e..375d2ff1814 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
@@ -36,11 +36,13 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
@@ -53,9 +55,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.calcite.plan.RelOptUtil.conjunctions;
@@ -80,6 +84,20 @@ public abstract class FlinkFilterJoinRule<C extends 
FlinkFilterJoinRule.Config>
     public static final FlinkJoinConditionPushRule JOIN_CONDITION_PUSH =
             
FlinkJoinConditionPushRule.FlinkFilterJoinRuleConfig.DEFAULT.toRule();
 
+    // For left/right join, not all filter conditions support push to another 
side after deduction.
+    // This set specifies the supported filter conditions.
+    public static final Set<SqlKind> SUITABLE_FILTER_TO_PUSH =
+            new HashSet() {
+                {
+                    add(SqlKind.EQUALS);
+                    add(SqlKind.GREATER_THAN);
+                    add(SqlKind.GREATER_THAN_OR_EQUAL);
+                    add(SqlKind.LESS_THAN);
+                    add(SqlKind.LESS_THAN_OR_EQUAL);
+                    add(SqlKind.NOT_EQUALS);
+                }
+            };
+
     /** Creates a FilterJoinRule. */
     protected FlinkFilterJoinRule(C config) {
         super(config);
@@ -353,7 +371,7 @@ public abstract class FlinkFilterJoinRule<C extends 
FlinkFilterJoinRule.Config>
         for (RexNode filter : filtersToPush) {
             final RelOptUtil.InputFinder inputFinder = 
RelOptUtil.InputFinder.analyze(filter);
             final ImmutableBitSet inputBits = inputFinder.build();
-            if (filter.isAlwaysTrue()) {
+            if (!isSuitableFilterToPush(filter, joinType)) {
                 continue;
             }
 
@@ -386,6 +404,30 @@ public abstract class FlinkFilterJoinRule<C extends 
FlinkFilterJoinRule.Config>
         }
     }
 
+    private boolean isSuitableFilterToPush(RexNode filter, JoinRelType 
joinType) {
+        if (filter.isAlwaysTrue()) {
+            return false;
+        }
+        if (joinType == JoinRelType.INNER) {
+            return true;
+        }
+        // For left/right outer join, now, we only support to push special 
condition in set
+        // SUITABLE_FILTER_TO_PUSH to other side. Take left outer join and 
IS_NULL condition as an
+        // example, If the join right side contains an IS_NULL filter, while 
we try to push it to
+        // the join left side and the left side have any other filter on this 
column, which will
+        // conflict and generate wrong plan.
+        if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT)
+                && filter instanceof RexCall) {
+            RexCall rexCall = (RexCall) filter;
+            if (SUITABLE_FILTER_TO_PUSH.contains(rexCall.op.kind)
+                    && (rexCall.getOperands().get(0) instanceof RexLiteral
+                            || rexCall.getOperands().get(1) instanceof 
RexLiteral)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private RexNode remapFilter(
             ImmutableIntList oldKeys,
             ImmutableIntList newKeys,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
index c1d195dceef..4ec5f93f6cc 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
@@ -216,6 +216,48 @@ public class FlinkFilterJoinRuleTest extends TableTestBase 
{
                 "SELECT * FROM MyTable2, MyTable1 WHERE b1 = b2 AND c1 = c2 
AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ");
     }
 
+    @Test
+    public void testInnerJoinWithNullFilter() {
+        util.verifyRelPlan(
+                "SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE 
a2 IS NULL");
+    }
+
+    @Test
+    public void testInnerJoinWithNullFilter2() {
+        util.verifyRelPlan(
+                "SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE 
a2 IS NULL AND a1 < 10");
+    }
+
+    @Test
+    public void testInnerJoinWithFilter1() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = 
a2 WHERE a2 < 1");
+    }
+
+    @Test
+    public void testInnerJoinWithFilter2() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = 
a2 WHERE a2 <> 1");
+    }
+
+    @Test
+    public void testInnerJoinWithFilter3() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = 
a2 WHERE a2 > 1");
+    }
+
+    @Test
+    public void testInnerJoinWithFilter4() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = 
a2 WHERE a2 >= 1");
+    }
+
+    @Test
+    public void testInnerJoinWithFilter5() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = 
a2 WHERE a2 <= 1");
+    }
+
+    @Test
+    public void testInnerJoinWithFilter6() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = 
a2 WHERE a2 = null");
+    }
+
     @Test
     public void testLeftJoinWithSomeFiltersFromLeftSide() {
         util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a1 = 2");
@@ -240,6 +282,50 @@ public class FlinkFilterJoinRuleTest extends TableTestBase 
{
                 "SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON true WHERE b1 = 
b2 AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ");
     }
 
+    @Test
+    public void testLeftJoinWithNullFilterInRightSide() {
+        // Even if there is a filter 'a2 IS NULL', the 'a1 IS NULL' cannot be 
generated for left
+        // join and this filter cannot be pushed down to both MyTable1 and 
MyTable2.
+        util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a2 IS NULL");
+    }
+
+    @Test
+    public void testLeftJoinWithNullFilterInRightSide2() {
+        // 'a2 IS NULL' cannot infer that 'a1 IS NULL'.
+        util.verifyRelPlan(
+                "SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 
IS NULL AND a1 < 10");
+    }
+
+    @Test
+    public void testLeftJoinWithFilter1() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a2 < 1");
+    }
+
+    @Test
+    public void testLeftJoinWithFilter2() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a2 <> 1");
+    }
+
+    @Test
+    public void testLeftJoinWithFilter3() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a2 > 1");
+    }
+
+    @Test
+    public void testLeftJoinWithFilter4() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a2 >= 1");
+    }
+
+    @Test
+    public void testLeftJoinWithFilter5() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a2 <= 1");
+    }
+
+    @Test
+    public void testLeftJoinWithFilter6() {
+        util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = 
a2 WHERE a2 = null");
+    }
+
     @Test
     public void testRightJoinWithAllFilterInONClause() {
         util.verifyRelPlan("SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = 
a2 AND a1 = 2");
@@ -264,6 +350,22 @@ public class FlinkFilterJoinRuleTest extends TableTestBase 
{
                 "SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON true WHERE b1 = 
b2 AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ");
     }
 
+    @Test
+    public void testRightJoinWithNullFilterInLeftSide() {
+        // Even if there is a filter 'a1 IS NULL', the 'a2 IS NULL' cannot be 
generated for right
+        // join and this filter cannot be pushed down to both MyTable1 and 
MyTable2.
+        util.verifyRelPlan(
+                "SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE 
a1 IS NULL");
+    }
+
+    @Test
+    public void testRightJoinWithNullFilterInRightSide2() {
+        // 'a1 IS NULL' cannot infer that 'a2 IS NULL'. However, 'a2 < 10' can 
infer that 'a1 < 10',
+        // and both of them can be pushed down.
+        util.verifyRelPlan(
+                "SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE 
a1 IS NULL AND a2 < 10");
+    }
+
     @Test
     public void testFullJoinWithAllFilterInONClause() {
         util.verifyRelPlan("SELECT * FROM MyTable1 FULL JOIN MyTable2 ON a1 = 
a2 AND a1 = 2");
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
index 20d2814d731..77015bd4c71 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
@@ -410,6 +410,75 @@ Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
       +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS 
count$0])
          +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
             +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter2">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d IS NULL AND a < 12]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- HashJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e, f], 
isBroadcast=[true], build=[right])
+   :- Calc(select=[a], where=[(a < 12)])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[broadcast])
+      +- Calc(select=[d, e, f], where=[(d < 12)])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter3">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d < 10 AND a < 12]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[d, e, f])
++- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f], 
isBroadcast=[true], build=[left])
+   :- Exchange(distribution=[broadcast])
+   :  +- Calc(select=[a], where=[(a < 10)])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Calc(select=[d, e, f], where=[(d < 10)])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter4">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d = null]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
index 9c8477cde77..309804ae6f4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
@@ -223,27 +223,6 @@ Calc(select=[c, g])
    :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
    +- Calc(select=[a, c], where=[(a < 2)])
       +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN  MyTable1 ON a = d AND d 
< 2 AND b < h]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
-+- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))], 
joinType=[left])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-NestedLoopJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b < 
h))], select=[d, e, f, g, h, a, b, c], build=[right])
-:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-+- Exchange(distribution=[broadcast])
-   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -381,6 +360,46 @@ Calc(select=[c, g])
    :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
    +- Calc(select=[d, e, g])
       +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithJoinConditionPushDown">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM
+   (select a, count(b) as b from MyTable1 group by a)
+   join
+   (select d, count(e) as e from MyTable2 group by d)
+   on a = d and b = e and d = 2 and b = 1
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+   :  +- LogicalProject(a=[$0], b=[$1])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
+      +- LogicalProject(d=[$0], e=[$1])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
++- MultipleInput(readOrder=[0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d], 
build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- Calc(select=[d], 
where=[(e = 1)])\n   +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, 
Final_COUNT(count$0) AS e])\n      +- [#2] Exchange(distribution=[hash[d]])\n])
+   :- Exchange(distribution=[broadcast])
+   :  +- Calc(select=[a], where=[(b = 1)])
+   :     +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_COUNT(count$0) AS b])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) 
AS count$0])
+   :              +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
+   :                 +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+   +- Exchange(distribution=[hash[d]])
+      +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS 
count$0])
+         +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], 
fields=[d, e, f, g, h])
 ]]>
     </Resource>
   </TestCase>
@@ -621,67 +640,49 @@ Calc(select=[c, g])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRightOuterJoinWithEquiPred">
+  <TestCase name="testLeftOuterJoinWithFilter1">
     <Resource name="sql">
-      <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d = 10 AND a < 12]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalJoin(condition=[=($1, $4)], joinType=[right])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($3, 10), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[c, g])
-+- NestedLoopJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e, 
g], build=[left])
+Calc(select=[CAST(10 AS INTEGER) AS d, e, f])
++- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, e, f], 
build=[left])
    :- Exchange(distribution=[broadcast])
-   :  +- Calc(select=[b, c])
+   :  +- Calc(select=[a], where=[(a = 10)])
    :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Calc(select=[e, g])
+   +- Calc(select=[e, f], where=[(d = 10)])
       +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testInnerJoinWithJoinConditionPushDown">
+  <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
     <Resource name="sql">
-      <![CDATA[
-SELECT * FROM
-   (select a, count(b) as b from MyTable1 group by a)
-   join
-   (select d, count(e) as e from MyTable2 group by d)
-   on a = d and b = e and d = 2 and b = 1
-]]>
+      <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN  MyTable1 ON a = d AND d 
< 2 AND b < h]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))], 
joinType=[inner])
-   :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
-   :  +- LogicalProject(a=[$0], b=[$1])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
-      +- LogicalProject(d=[$0], e=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
++- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))], 
joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
-+- MultipleInput(readOrder=[0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d], 
build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- Calc(select=[d], 
where=[(e = 1)])\n   +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, 
Final_COUNT(count$0) AS e])\n      +- [#2] Exchange(distribution=[hash[d]])\n])
-   :- Exchange(distribution=[broadcast])
-   :  +- Calc(select=[a], where=[(b = 1)])
-   :     +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_COUNT(count$0) AS b])
-   :        +- Exchange(distribution=[hash[a]])
-   :           +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) 
AS count$0])
-   :              +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
-   :                 +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
-   +- Exchange(distribution=[hash[d]])
-      +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS 
count$0])
-         +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
-            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], 
fields=[d, e, f, g, h])
+NestedLoopJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b < 
h))], select=[d, e, f, g, h, a, b, c], build=[right])
+:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- Exchange(distribution=[broadcast])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -706,6 +707,75 @@ Calc(select=[c, g])
    +- Exchange(distribution=[broadcast])
       +- Calc(select=[e, g])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter2">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d IS NULL AND a < 12]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e, 
f], build=[right])
+   :- Calc(select=[a], where=[(a < 12)])
+   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[broadcast])
+      +- Calc(select=[d, e, f], where=[(d < 12)])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter3">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d < 10 AND a < 12]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[d, e, f])
++- NestedLoopJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f], 
build=[left])
+   :- Exchange(distribution=[broadcast])
+   :  +- Calc(select=[a], where=[(a < 10)])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Calc(select=[d, e, f], where=[(d < 10)])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter4">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d = null]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
@@ -746,6 +816,30 @@ Calc(select=[a, CAST(b AS BIGINT) AS b, CAST(2 AS INTEGER) 
AS d, e])
       +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS 
count$0])
          +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
             +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], 
fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightOuterJoinWithEquiPred">
+    <Resource name="sql">
+      <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], g=[$6])
++- LogicalJoin(condition=[=($1, $4)], joinType=[right])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[c, g])
++- NestedLoopJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e, 
g], build=[left])
+   :- Exchange(distribution=[broadcast])
+   :  +- Calc(select=[b, c])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Calc(select=[e, g])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
index 8b50a7ce09f..ac56e90fd11 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
@@ -137,6 +137,28 @@ Calc(select=[c, g])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, c], where=[(a < 2)])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN  MyTable1 ON a = d AND d 
< 2 AND b < h]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
++- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))], 
joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b < h))], 
select=[d, e, f, g, h, a, b, c], build=[right])
+:- Exchange(distribution=[hash[d]])
+:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- Exchange(distribution=[hash[a]])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -280,43 +302,6 @@ Calc(select=[c, g])
    +- Exchange(distribution=[hash[e, d]])
       +- Calc(select=[d, e, g])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testInnerJoinWithJoinConditionPushDown">
-    <Resource name="sql">
-      <![CDATA[
-SELECT * FROM
-   (select a, count(b) as b from MyTable1 group by a)
-   join
-   (select d, count(e) as e from MyTable2 group by d)
-   on a = d and b = e and d = 2 and b = 1
-]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))], 
joinType=[inner])
-   :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
-   :  +- LogicalProject(a=[$0], b=[$1])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
-      +- LogicalProject(d=[$0], e=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
-+- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(a = d)], select=[a, d], build=[right])\n:- Calc(select=[a], where=[(b = 
1)])\n:  +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_COUNT(count$0) AS b])\n:     +- [#1] Exchange(distribution=[hash[a]])\n+- 
Calc(select=[d], where=[(e = 1)])\n   +- HashAggregate(isMerge=[true], 
groupBy=[d], select=[d, Final_COUNT(count$0) AS e])\n      +- [#2] 
Exchange(distribution=[hash[d]])\n])
-   :- Exchange(distribution=[hash[a]])
-   :  +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS 
count$0])
-   :     +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
-   :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
-   +- Exchange(distribution=[hash[d]])
-      +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS 
count$0])
-         +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
-            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], 
fields=[d, e, f, g, h])
 ]]>
     </Resource>
   </TestCase>
@@ -473,82 +458,136 @@ Calc(select=[c, g])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSelfJoin">
+  <TestCase name="testRightOuterJoinWithEquiPred">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM
-  (SELECT * FROM src WHERE k = 0) src1
-LEFT OUTER JOIN
-  (SELECT * from src WHERE k = 0) src2
-ON (src1.k = src2.k AND src2.k > 10)
-         ]]>
+      <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(k=[$0], v=[$1], k0=[$2], v0=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), >($2, 10))], joinType=[left])
-   :- LogicalProject(k=[$0], v=[$1])
-   :  +- LogicalFilter(condition=[=($0, 0)])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, src, 
source: [TestTableSource(k, v)]]])
-   +- LogicalProject(k=[$0], v=[$1])
-      +- LogicalFilter(condition=[=($0, 0)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, src, 
source: [TestTableSource(k, v)]]])
+LogicalProject(c=[$2], g=[$6])
++- LogicalJoin(condition=[=($1, $4)], joinType=[right])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-HashJoin(joinType=[LeftOuterJoin], where=[(k = k0)], select=[k, v, k0, v0], 
build=[right])
-:- Exchange(distribution=[hash[k]])
-:  +- Calc(select=[CAST(0 AS BIGINT) AS k, v], where=[(k = 0)])
-:     +- LegacyTableSourceScan(table=[[default_catalog, default_database, src, 
source: [TestTableSource(k, v)]]], fields=[k, v])
-+- Exchange(distribution=[hash[k]])
-   +- Values(tuples=[[]], values=[k, v])
+Calc(select=[c, g])
++- HashJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e, g], 
build=[right])
+   :- Exchange(distribution=[hash[b]])
+   :  +- Calc(select=[b, c])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[e]])
+      +- Calc(select=[e, g])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
+  <TestCase name="testInnerJoinWithJoinConditionPushDown">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN  MyTable1 ON a = d AND d 
< 2 AND b < h]]>
+      <![CDATA[
+SELECT * FROM
+   (select a, count(b) as b from MyTable1 group by a)
+   join
+   (select d, count(e) as e from MyTable2 group by d)
+   on a = d and b = e and d = 2 and b = 1
+]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
-+- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))], 
joinType=[left])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+   :  +- LogicalProject(a=[$0], b=[$1])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
+      +- LogicalProject(d=[$0], e=[$1])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-HashJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b < h))], 
select=[d, e, f, g, h, a, b, c], build=[right])
-:- Exchange(distribution=[hash[d]])
-:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-+- Exchange(distribution=[hash[a]])
-   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
++- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(a = d)], select=[a, d], build=[right])\n:- Calc(select=[a], where=[(b = 
1)])\n:  +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_COUNT(count$0) AS b])\n:     +- [#1] Exchange(distribution=[hash[a]])\n+- 
Calc(select=[d], where=[(e = 1)])\n   +- HashAggregate(isMerge=[true], 
groupBy=[d], select=[d, Final_COUNT(count$0) AS e])\n      +- [#2] 
Exchange(distribution=[hash[d]])\n])
+   :- Exchange(distribution=[hash[a]])
+   :  +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS 
count$0])
+   :     +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
+   :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+   +- Exchange(distribution=[hash[d]])
+      +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS 
count$0])
+         +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], 
fields=[d, e, f, g, h])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testLeftOuterJoinWithEquiPred">
+  <TestCase name="testLeftOuterJoinWithFilter2">
     <Resource name="sql">
-      <![CDATA[SELECT c, g FROM MyTable1 LEFT OUTER JOIN MyTable2 ON b = e]]>
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d IS NULL AND a < 12]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalJoin(condition=[=($1, $4)], joinType=[left])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[c, g])
-+- HashJoin(joinType=[LeftOuterJoin], where=[(b = e)], select=[b, c, e, g], 
build=[right])
-   :- Exchange(distribution=[hash[b]])
-   :  +- Calc(select=[b, c])
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- HashJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e, f], 
build=[left])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a], where=[(a < 12)])
    :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[hash[e]])
-      +- Calc(select=[e, g])
+   +- Exchange(distribution=[hash[d]])
+      +- Calc(select=[d, e, f], where=[(d < 12)])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter3">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d < 10 AND a < 12]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[d, e, f])
++- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f], 
build=[left])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a], where=[(a < 10)])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[d]])
+      +- Calc(select=[d, e, f], where=[(d < 10)])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter4">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d = null]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
@@ -594,28 +633,64 @@ Calc(select=[a, CAST(b AS BIGINT) AS b, CAST(2 AS 
INTEGER) AS d, e])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRightOuterJoinWithEquiAndLocalPred">
+  <TestCase name="testRightJoinWithJoinConditionPushDown">
     <Resource name="sql">
-      <![CDATA[SELECT c, g FROM MyTable2 RIGHT OUTER JOIN  MyTable1 ON a = d 
AND d < 2]]>
+      <![CDATA[
+SELECT * FROM
+   (select a, count(b) as b from MyTable1 group by a)
+   right join
+   (select d, count(e) as e from MyTable2 group by d)
+   on a = d and b = e and d = 2 and b = 1
+]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(c=[$7], g=[$3])
-+- LogicalJoin(condition=[AND(=($5, $0), <($0, 2))], joinType=[right])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))], 
joinType=[right])
+   :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+   :  +- LogicalProject(a=[$0], b=[$1])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
+      +- LogicalProject(d=[$0], e=[$1])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[c, g])
-+- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[d, g, a, c], 
build=[left])
-   :- Exchange(distribution=[hash[d]])
-   :  +- Calc(select=[d, g], where=[(d < 2)])
-   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, c])
+MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[RightOuterJoin], 
where=[((a = d) AND (1 = e) AND (d = 2))], select=[a, b, d, e], 
build=[left])\n:- Calc(select=[a, b], where=[(b = 1)])\n:  +- 
HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS 
b])\n:     +- [#2] Exchange(distribution=[hash[a]])\n+- 
HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS 
e])\n   +- [#1] Exchange(distribution=[hash[d]])\n])
+:- Exchange(distribution=[hash[d]])
+:  +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
+:     +- Calc(select=[d, e])
+:        +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- Exchange(distribution=[hash[a]])
+   +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
+      +- Calc(select=[a, b])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithEquiPred">
+    <Resource name="sql">
+      <![CDATA[SELECT c, g FROM MyTable1 LEFT OUTER JOIN MyTable2 ON b = e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2], g=[$6])
++- LogicalJoin(condition=[=($1, $4)], joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[c, g])
++- HashJoin(joinType=[LeftOuterJoin], where=[(b = e)], select=[b, c, e, g], 
build=[right])
+   :- Exchange(distribution=[hash[b]])
+   :  +- Calc(select=[b, c])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[e]])
+      +- Calc(select=[e, g])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
 ]]>
     </Resource>
   </TestCase>
@@ -642,64 +717,60 @@ HashJoin(joinType=[RightOuterJoin], where=[((a = d) AND 
(b < h))], select=[d, e,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRightOuterJoinWithEquiPred">
+  <TestCase name="testRightOuterJoinWithEquiAndLocalPred">
     <Resource name="sql">
-      <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
+      <![CDATA[SELECT c, g FROM MyTable2 RIGHT OUTER JOIN  MyTable1 ON a = d 
AND d < 2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalJoin(condition=[=($1, $4)], joinType=[right])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(c=[$7], g=[$3])
++- LogicalJoin(condition=[AND(=($5, $0), <($0, 2))], joinType=[right])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, g])
-+- HashJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e, g], 
build=[right])
-   :- Exchange(distribution=[hash[b]])
-   :  +- Calc(select=[b, c])
-   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[hash[e]])
-      +- Calc(select=[e, g])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[d, g, a, c], 
build=[left])
+   :- Exchange(distribution=[hash[d]])
+   :  +- Calc(select=[d, g], where=[(d < 2)])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, c])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRightJoinWithJoinConditionPushDown">
+  <TestCase name="testSelfJoin">
     <Resource name="sql">
-      <![CDATA[
-SELECT * FROM
-   (select a, count(b) as b from MyTable1 group by a)
-   right join
-   (select d, count(e) as e from MyTable2 group by d)
-   on a = d and b = e and d = 2 and b = 1
-]]>
+      <![CDATA[SELECT * FROM
+  (SELECT * FROM src WHERE k = 0) src1
+LEFT OUTER JOIN
+  (SELECT * from src WHERE k = 0) src2
+ON (src1.k = src2.k AND src2.k > 10)
+         ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))], 
joinType=[right])
-   :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
-   :  +- LogicalProject(a=[$0], b=[$1])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
-      +- LogicalProject(d=[$0], e=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(k=[$0], v=[$1], k0=[$2], v0=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), >($2, 10))], joinType=[left])
+   :- LogicalProject(k=[$0], v=[$1])
+   :  +- LogicalFilter(condition=[=($0, 0)])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, src, 
source: [TestTableSource(k, v)]]])
+   +- LogicalProject(k=[$0], v=[$1])
+      +- LogicalFilter(condition=[=($0, 0)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, src, 
source: [TestTableSource(k, v)]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[RightOuterJoin], 
where=[((a = d) AND (1 = e) AND (d = 2))], select=[a, b, d, e], 
build=[left])\n:- Calc(select=[a, b], where=[(b = 1)])\n:  +- 
HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS 
b])\n:     +- [#2] Exchange(distribution=[hash[a]])\n+- 
HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS 
e])\n   +- [#1] Exchange(distribution=[hash[d]])\n])
-:- Exchange(distribution=[hash[d]])
-:  +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
-:     +- Calc(select=[d, e])
-:        +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-+- Exchange(distribution=[hash[a]])
-   +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
-      +- Calc(select=[a, b])
-         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+HashJoin(joinType=[LeftOuterJoin], where=[(k = k0)], select=[k, v, k0, v0], 
build=[right])
+:- Exchange(distribution=[hash[k]])
+:  +- Calc(select=[CAST(0 AS BIGINT) AS k, v], where=[(k = 0)])
+:     +- LegacyTableSourceScan(table=[[default_catalog, default_database, src, 
source: [TestTableSource(k, v)]]], fields=[k, v])
++- Exchange(distribution=[hash[k]])
+   +- Values(tuples=[[]], values=[k, v])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
index 836d0f99a90..e552689a40d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
@@ -517,6 +517,77 @@ Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
       +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS 
count$0])
          +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
             +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], 
fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter2">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d IS NULL AND a < 12]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- SortMergeJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e, 
f])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a], where=[(a < 12)])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[d]])
+      +- Calc(select=[d, e, f], where=[(d < 12)])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter3">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d < 10 AND a < 12]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[d, e, f])
++- SortMergeJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a], where=[(a < 10)])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[d]])
+      +- Calc(select=[d, e, f], where=[(d < 10)])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftOuterJoinWithFilter4">
+    <Resource name="sql">
+      <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where 
d = null]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
index 3a311ca1d60..687887b399a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
@@ -432,6 +432,194 @@ LogicalProject(b2=[$0], c2=[$1], a2=[$2], a1=[$3], 
b1=[$4], c1=[$5])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
    +- LogicalFilter(condition=[>($1, 10)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithFilter1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
< 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[<($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[<($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithFilter2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
<> 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<>($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[<>($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[<>($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithFilter3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
> 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[>($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[>($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithFilter4">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
>= 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>=($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[>=($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[>=($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithFilter5">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
<= 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<=($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[<=($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[<=($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithFilter6">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
= null]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[=($5, null)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalValues(tuples=[[]])
+   +- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithNullFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
IS NULL]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[IS NULL($0)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[IS NULL($2)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithNullFilter2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2 
IS NULL AND a1 < 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(IS NULL($5), <($0, 10))])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalValues(tuples=[[]])
+   +- LogicalValues(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -711,6 +899,30 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], 
c2=[$4], a2=[$5])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
    +- LogicalFilter(condition=[=($2, 2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithAllFiltersFromWhere">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON true WHERE b1 = b2 
AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(=($1, $3), =($2, $4), =($5, 2), >($3, 10), 
<>(COALESCE($2, $4), _UTF-16LE''))])
+   +- LogicalJoin(condition=[true], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[AND(=($1, $3), =($2, $4), <>(COALESCE($2, $4), 
_UTF-16LE''))], joinType=[inner])
+   :- LogicalFilter(condition=[>($1, 10)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[AND(=($2, 2), >($0, 10))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
     </Resource>
   </TestCase>
@@ -736,15 +948,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], 
c2=[$4], a2=[$5])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testLeftJoinWithAllFiltersFromWhere">
+  <TestCase name="testLeftJoinWithFilter1">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON true WHERE b1 = b2 
AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ]]>
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 < 
1]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalFilter(condition=[AND(=($1, $3), =($2, $4), =($5, 2), >($3, 10), 
<>(COALESCE($2, $4), _UTF-16LE''))])
-   +- LogicalJoin(condition=[true], joinType=[left])
++- LogicalFilter(condition=[<($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
       :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -752,11 +964,177 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], 
c2=[$4], a2=[$5])
     <Resource name="optimized rel plan">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4), <>(COALESCE($2, $4), 
_UTF-16LE''))], joinType=[inner])
-   :- LogicalFilter(condition=[>($1, 10)])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[<($0, 1)])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
-   +- LogicalFilter(condition=[AND(=($2, 2), >($0, 10))])
+   +- LogicalFilter(condition=[<($2, 1)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithFilter2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 
<> 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<>($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[<>($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[<>($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithFilter3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 > 
1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[>($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[>($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithFilter4">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 
>= 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>=($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[>=($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[>=($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithFilter5">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 
<= 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<=($5, 1)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[<=($0, 1)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[<=($2, 1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithFilter6">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 = 
null]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[=($5, null)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+   :- LogicalValues(tuples=[[]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithNullFilterInRightSide">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 
IS NULL]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithNullFilterInRightSide2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 
IS NULL AND a1 < 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(IS NULL($5), <($0, 10))])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalFilter(condition=[<($0, 10)])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalFilter(condition=[<($2, 10)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
     </Resource>
   </TestCase>
@@ -781,6 +1159,55 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], 
c2=[$4], a2=[$5])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
    +- LogicalFilter(condition=[AND(=($2, 2), >($0, 10), IS NOT NULL($1))])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinWithSomeFiltersFromLeftSide">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a1 = 
2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[=($0, 2)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+   :- LogicalFilter(condition=[=($0, 2)])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalFilter(condition=[=($2, 2)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightJoinWithNullFilterInRightSide2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE a1 
IS NULL AND a2 < 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(IS NULL($0), <($5, 10))])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($0)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
+      :- LogicalFilter(condition=[<($0, 10)])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+      +- LogicalFilter(condition=[<($2, 10)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
     </Resource>
   </TestCase>
@@ -808,15 +1235,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], 
c2=[$4], a2=[$5])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testLeftJoinWithSomeFiltersFromLeftSide">
+  <TestCase name="testRightJoinWithNullFilterInLeftSide">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a1 = 
2]]>
+      <![CDATA[SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE a1 
IS NULL]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalFilter(condition=[=($0, 2)])
-   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
++- LogicalFilter(condition=[IS NULL($0)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
       :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -824,10 +1251,9 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], 
c2=[$4], a2=[$5])
     <Resource name="optimized rel plan">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalJoin(condition=[=($0, $5)], joinType=[left])
-   :- LogicalFilter(condition=[=($0, 2)])
-   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
-   +- LogicalFilter(condition=[=($2, 2)])
++- LogicalFilter(condition=[IS NULL($0)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
index 795e55a6ed2..6f9fa8e6007 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
@@ -19,8 +19,6 @@ package org.apache.flink.table.planner.plan.batch.sql.join
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
 
 import org.junit.Test
@@ -36,6 +34,32 @@ abstract class JoinTestBase extends TableTestBase {
     util.verifyExecPlan("SELECT c, g FROM MyTable1, MyTable2 WHERE foo = e")
   }
 
+  @Test
+  def testLeftOuterJoinWithFilter2(): Unit = {
+    // For left/right join, we will only push equal filter condition into
+    // other side by derived from join condition and filter condition. So,
+    // d IS NULL cannot be push into left side.
+    util.verifyExecPlan(
+      "SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where d IS 
NULL AND a < 12")
+  }
+
+  @Test
+  def testLeftOuterJoinWithFilter3(): Unit = {
+    // For left/right join, we will only push equal filter condition into
+    // other side by derived from join condition and filter condition. So,
+    // d < 10 cannot be push into left side.
+    util.verifyExecPlan(
+      "SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where d < 10 
AND a < 12")
+  }
+
+  @Test
+  def testLeftOuterJoinWithFilter4(): Unit = {
+    // For left/right join, we will only push equal filter condition into
+    // other side by derived from join condition and filter condition. So,
+    // d = null cannot be push into left side.
+    util.verifyExecPlan("SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a 
= d where d = null")
+  }
+
   @Test(expected = classOf[TableException])
   def testJoinNonMatchingKeyTypes(): Unit = {
     // INTEGER and VARCHAR(65536) does not have common type now
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
index 476ca4089c1..f725d350d0d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.batch.sql.join
 
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 
-import org.junit.Before
+import org.junit.{Before, Test}
 
 class NestedLoopJoinTest extends JoinTestBase {
 
@@ -28,4 +28,12 @@ class NestedLoopJoinTest extends JoinTestBase {
     util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"SortMergeJoin, HashJoin")
   }
+
+  @Test
+  def testLeftOuterJoinWithFilter1(): Unit = {
+    // Only support for nested loop join.
+    // We will push a = 10 into left side MyTable1 by derived from a = d and d 
= 10.
+    util.verifyExecPlan(
+      "SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where d = 10 
AND a < 12")
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
index 45278a10503..cb5ca93929f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
@@ -1143,6 +1143,114 @@ class JoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
         row(6, null, 1, 6, null, 1),
         row(null, null, null, 4, 1.0, 1))
     )
+
+    checkResult(
+      """
+        |select * from
+        | l inner join r on a = c where c IS NULL
+        |""".stripMargin,
+      Seq()
+    )
+
+    checkResult(
+      """
+        |select * from
+        | l inner join r on a = c where c = NULL
+        |""".stripMargin,
+      Seq()
+    )
+
+    if (expectedJoinType == NestedLoopJoin) {
+      // For inner join, we will push c = 3 into left side l by
+      // derived from a = c and c = 3.
+      checkResult(
+        """
+          |select * from
+          | l inner join r on a = c where c = 3
+          |""".stripMargin,
+        Seq(
+          row(3, 3.0, 3, 2.0)
+        )
+      )
+
+      // For left join, we will push c = 3 into left side l by
+      // derived from a = c and c = 3.
+      checkResult(
+        """
+          |select * from
+          | l left join r on a = c where c = 3
+          |""".stripMargin,
+        Seq(
+          row(3, 3.0, 3, 2.0)
+        )
+      )
+    }
+
+    // For left/right join, we will only push equal filter condition into
+    // other side by derived from join condition and filter condition. So,
+    // c IS NULL cannot be push into left side.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c IS NULL
+        |""".stripMargin,
+      Seq(
+        row(1, 2.0, null, null),
+        row(1, 2.0, null, null),
+        row(null, 5.0, null, null),
+        row(null, null, null, null)
+      )
+    )
+
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c IS NULL AND a <= 1
+        |""".stripMargin,
+      Seq(
+        row(1, 2.0, null, null),
+        row(1, 2.0, null, null)
+      )
+    )
+
+    // For 'c = NULL', all data cannot match this condition.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c = NULL
+        |""".stripMargin,
+      Seq()
+    )
+
+    // For left/right join, we will only push equal filter condition into
+    // other side by derived from join condition and filter condition. So,
+    // c < 3 cannot be push into left side.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c < 3 AND a <= 3
+        |""".stripMargin,
+      Seq(
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0)
+      )
+    )
+
+    // C <> 3 cannot be push into left side.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c <> 3 AND a <= 3
+        |""".stripMargin,
+      Seq(
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0)
+      )
+    )
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 86863160348..ac59bc2623f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -1501,6 +1501,75 @@ class JoinITCase(state: StateBackendMode) extends 
StreamingWithStateTestBase(sta
         row(6, null, 1, 6, null, 1),
         row(null, null, null, 4, 1.0, 1))
     )
+
+    // For left join, we will push c = 3 into left side l by
+    // derived from a = c and c = 3.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c = 3
+        |""".stripMargin,
+      Seq(
+        row(3, 3.0, 3, 2.0)
+      )
+    )
+
+    // For left/right join, we will only push equal filter condition into
+    // other side by derived from join condition and filter condition. So,
+    // c IS NULL cannot be push into left side.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c IS NULL
+        |""".stripMargin,
+      Seq(
+        row(1, 2.0, null, null),
+        row(1, 2.0, null, null),
+        row(null, 5.0, null, null),
+        row(null, null, null, null)
+      )
+    )
+
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c IS NULL AND a <= 1
+        |""".stripMargin,
+      Seq(
+        row(1, 2.0, null, null),
+        row(1, 2.0, null, null)
+      )
+    )
+
+    // For left/right join, we will only push equal filter condition into
+    // other side by derived from join condition and filter condition. So,
+    // c < 3 cannot be push into left side.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c < 3 AND a <= 3
+        |""".stripMargin,
+      Seq(
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0)
+      )
+    )
+
+    // C <> 3 cannot be push into left side.
+    checkResult(
+      """
+        |select * from
+        | l left join r on a = c where c <> 3 AND a <= 3
+        |""".stripMargin,
+      Seq(
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0),
+        row(2, 1.0, 2, 3.0)
+      )
+    )
   }
 
   @Test


Reply via email to