This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 33278628dc5 [FLINK-31273][table-planner] Fix left join with IS_NULL
filter be wrongly pushed down and get wrong join results
33278628dc5 is described below
commit 33278628dc599bed8944733efb9495ce77993d4b
Author: zhengyunhong.zyh <[email protected]>
AuthorDate: Wed Mar 1 15:28:50 2023 +0800
[FLINK-31273][table-planner] Fix left join with IS_NULL filter be wrongly
pushed down and get wrong join results
This closes #22049
(cherry picked from commit 8990822bd77d70f3249e1220a853e16dadd8ef54)
---
.../plan/rules/logical/FlinkFilterJoinRule.java | 44 +-
.../rules/logical/FlinkFilterJoinRuleTest.java | 102 +++++
.../plan/batch/sql/join/BroadcastHashJoinTest.xml | 69 ++++
.../plan/batch/sql/join/NestedLoopJoinTest.xml | 214 +++++++---
.../plan/batch/sql/join/ShuffledHashJoinTest.xml | 335 +++++++++------
.../plan/batch/sql/join/SortMergeJoinTest.xml | 71 ++++
.../plan/rules/logical/FlinkFilterJoinRuleTest.xml | 456 ++++++++++++++++++++-
.../planner/plan/batch/sql/join/JoinTestBase.scala | 28 +-
.../plan/batch/sql/join/NestedLoopJoinTest.scala | 10 +-
.../runtime/batch/sql/join/JoinITCase.scala | 108 +++++
.../planner/runtime/stream/sql/JoinITCase.scala | 69 ++++
11 files changed, 1295 insertions(+), 211 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
index 47d7a44f69e..375d2ff1814 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java
@@ -36,11 +36,13 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
@@ -53,9 +55,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.calcite.plan.RelOptUtil.conjunctions;
@@ -80,6 +84,20 @@ public abstract class FlinkFilterJoinRule<C extends
FlinkFilterJoinRule.Config>
public static final FlinkJoinConditionPushRule JOIN_CONDITION_PUSH =
FlinkJoinConditionPushRule.FlinkFilterJoinRuleConfig.DEFAULT.toRule();
+ // For left/right join, not all filter conditions support push to another
side after deduction.
+ // This set specifies the supported filter conditions.
+ public static final Set<SqlKind> SUITABLE_FILTER_TO_PUSH =
+ new HashSet() {
+ {
+ add(SqlKind.EQUALS);
+ add(SqlKind.GREATER_THAN);
+ add(SqlKind.GREATER_THAN_OR_EQUAL);
+ add(SqlKind.LESS_THAN);
+ add(SqlKind.LESS_THAN_OR_EQUAL);
+ add(SqlKind.NOT_EQUALS);
+ }
+ };
+
/** Creates a FilterJoinRule. */
protected FlinkFilterJoinRule(C config) {
super(config);
@@ -353,7 +371,7 @@ public abstract class FlinkFilterJoinRule<C extends
FlinkFilterJoinRule.Config>
for (RexNode filter : filtersToPush) {
final RelOptUtil.InputFinder inputFinder =
RelOptUtil.InputFinder.analyze(filter);
final ImmutableBitSet inputBits = inputFinder.build();
- if (filter.isAlwaysTrue()) {
+ if (!isSuitableFilterToPush(filter, joinType)) {
continue;
}
@@ -386,6 +404,30 @@ public abstract class FlinkFilterJoinRule<C extends
FlinkFilterJoinRule.Config>
}
}
+ private boolean isSuitableFilterToPush(RexNode filter, JoinRelType
joinType) {
+ if (filter.isAlwaysTrue()) {
+ return false;
+ }
+ if (joinType == JoinRelType.INNER) {
+ return true;
+ }
+ // For left/right outer join, now, we only support to push special
condition in set
+ // SUITABLE_FILTER_TO_PUSH to other side. Take left outer join and
IS_NULL condition as an
+ // example, If the join right side contains an IS_NULL filter, while
we try to push it to
+ // the join left side and the left side have any other filter on this
column, which will
+ // conflict and generate wrong plan.
+ if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.RIGHT)
+ && filter instanceof RexCall) {
+ RexCall rexCall = (RexCall) filter;
+ if (SUITABLE_FILTER_TO_PUSH.contains(rexCall.op.kind)
+ && (rexCall.getOperands().get(0) instanceof RexLiteral
+ || rexCall.getOperands().get(1) instanceof
RexLiteral)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private RexNode remapFilter(
ImmutableIntList oldKeys,
ImmutableIntList newKeys,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
index c1d195dceef..4ec5f93f6cc 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.java
@@ -216,6 +216,48 @@ public class FlinkFilterJoinRuleTest extends TableTestBase
{
"SELECT * FROM MyTable2, MyTable1 WHERE b1 = b2 AND c1 = c2
AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ");
}
+ @Test
+ public void testInnerJoinWithNullFilter() {
+ util.verifyRelPlan(
+ "SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE
a2 IS NULL");
+ }
+
+ @Test
+ public void testInnerJoinWithNullFilter2() {
+ util.verifyRelPlan(
+ "SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE
a2 IS NULL AND a1 < 10");
+ }
+
+ @Test
+ public void testInnerJoinWithFilter1() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 =
a2 WHERE a2 < 1");
+ }
+
+ @Test
+ public void testInnerJoinWithFilter2() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 =
a2 WHERE a2 <> 1");
+ }
+
+ @Test
+ public void testInnerJoinWithFilter3() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 =
a2 WHERE a2 > 1");
+ }
+
+ @Test
+ public void testInnerJoinWithFilter4() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 =
a2 WHERE a2 >= 1");
+ }
+
+ @Test
+ public void testInnerJoinWithFilter5() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 =
a2 WHERE a2 <= 1");
+ }
+
+ @Test
+ public void testInnerJoinWithFilter6() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 =
a2 WHERE a2 = null");
+ }
+
@Test
public void testLeftJoinWithSomeFiltersFromLeftSide() {
util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a1 = 2");
@@ -240,6 +282,50 @@ public class FlinkFilterJoinRuleTest extends TableTestBase
{
"SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON true WHERE b1 =
b2 AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ");
}
+ @Test
+ public void testLeftJoinWithNullFilterInRightSide() {
+ // Even if there is a filter 'a2 IS NULL', the 'a1 IS NULL' cannot be
generated for left
+ // join and this filter cannot be pushed down to both MyTable1 and
MyTable2.
+ util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a2 IS NULL");
+ }
+
+ @Test
+ public void testLeftJoinWithNullFilterInRightSide2() {
+ // 'a2 IS NULL' cannot infer that 'a1 IS NULL'.
+ util.verifyRelPlan(
+ "SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2
IS NULL AND a1 < 10");
+ }
+
+ @Test
+ public void testLeftJoinWithFilter1() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a2 < 1");
+ }
+
+ @Test
+ public void testLeftJoinWithFilter2() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a2 <> 1");
+ }
+
+ @Test
+ public void testLeftJoinWithFilter3() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a2 > 1");
+ }
+
+ @Test
+ public void testLeftJoinWithFilter4() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a2 >= 1");
+ }
+
+ @Test
+ public void testLeftJoinWithFilter5() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a2 <= 1");
+ }
+
+ @Test
+ public void testLeftJoinWithFilter6() {
+ util.verifyRelPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 =
a2 WHERE a2 = null");
+ }
+
@Test
public void testRightJoinWithAllFilterInONClause() {
util.verifyRelPlan("SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 =
a2 AND a1 = 2");
@@ -264,6 +350,22 @@ public class FlinkFilterJoinRuleTest extends TableTestBase
{
"SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON true WHERE b1 =
b2 AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ");
}
+ @Test
+ public void testRightJoinWithNullFilterInLeftSide() {
+ // Even if there is a filter 'a1 IS NULL', the 'a2 IS NULL' cannot be
generated for right
+ // join and this filter cannot be pushed down to both MyTable1 and
MyTable2.
+ util.verifyRelPlan(
+ "SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE
a1 IS NULL");
+ }
+
+ @Test
+ public void testRightJoinWithNullFilterInRightSide2() {
+ // 'a1 IS NULL' cannot infer that 'a2 IS NULL'. However, 'a2 < 10' can
infer that 'a1 < 10',
+ // and both of them can be pushed down.
+ util.verifyRelPlan(
+ "SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE
a1 IS NULL AND a2 < 10");
+ }
+
@Test
public void testFullJoinWithAllFilterInONClause() {
util.verifyRelPlan("SELECT * FROM MyTable1 FULL JOIN MyTable2 ON a1 =
a2 AND a1 = 2");
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
index 20d2814d731..77015bd4c71 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
@@ -410,6 +410,75 @@ Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS
count$0])
+- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
+- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter2">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d IS NULL AND a < 12]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- HashJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e, f],
isBroadcast=[true], build=[right])
+ :- Calc(select=[a], where=[(a < 12)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[d, e, f], where=[(d < 12)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter3">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d < 10 AND a < 12]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[d, e, f])
++- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f],
isBroadcast=[true], build=[left])
+ :- Exchange(distribution=[broadcast])
+ : +- Calc(select=[a], where=[(a < 10)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[d, e, f], where=[(d < 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter4">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d = null]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
index 9c8477cde77..309804ae6f4 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
@@ -223,27 +223,6 @@ Calc(select=[c, g])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+- Calc(select=[a, c], where=[(a < 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
- <Resource name="sql">
- <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN MyTable1 ON a = d AND d
< 2 AND b < h]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
-+- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))],
joinType=[left])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
-NestedLoopJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b <
h))], select=[d, e, f, g, h, a, b, c], build=[right])
-:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-+- Exchange(distribution=[broadcast])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -381,6 +360,46 @@ Calc(select=[c, g])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Calc(select=[d, e, g])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithJoinConditionPushDown">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM
+ (select a, count(b) as b from MyTable1 group by a)
+ join
+ (select d, count(e) as e from MyTable2 group by d)
+ on a = d and b = e and d = 2 and b = 1
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))],
joinType=[inner])
+ :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+ : +- LogicalProject(a=[$0], b=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
++- MultipleInput(readOrder=[0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d],
build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- Calc(select=[d],
where=[(e = 1)])\n +- HashAggregate(isMerge=[true], groupBy=[d], select=[d,
Final_COUNT(count$0) AS e])\n +- [#2] Exchange(distribution=[hash[d]])\n])
+ :- Exchange(distribution=[broadcast])
+ : +- Calc(select=[a], where=[(b = 1)])
+ : +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_COUNT(count$0) AS b])
+ : +- Exchange(distribution=[hash[a]])
+ : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b)
AS count$0])
+ : +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
+ +- Exchange(distribution=[hash[d]])
+ +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS
count$0])
+ +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]],
fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
@@ -621,67 +640,49 @@ Calc(select=[c, g])
]]>
</Resource>
</TestCase>
- <TestCase name="testRightOuterJoinWithEquiPred">
+ <TestCase name="testLeftOuterJoinWithFilter1">
<Resource name="sql">
- <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d = 10 AND a < 12]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalJoin(condition=[=($1, $4)], joinType=[right])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($3, 10), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[c, g])
-+- NestedLoopJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e,
g], build=[left])
+Calc(select=[CAST(10 AS INTEGER) AS d, e, f])
++- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, e, f],
build=[left])
:- Exchange(distribution=[broadcast])
- : +- Calc(select=[b, c])
+ : +- Calc(select=[a], where=[(a = 10)])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Calc(select=[e, g])
+ +- Calc(select=[e, f], where=[(d = 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
- <TestCase name="testInnerJoinWithJoinConditionPushDown">
+ <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
<Resource name="sql">
- <![CDATA[
-SELECT * FROM
- (select a, count(b) as b from MyTable1 group by a)
- join
- (select d, count(e) as e from MyTable2 group by d)
- on a = d and b = e and d = 2 and b = 1
-]]>
+ <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN MyTable1 ON a = d AND d
< 2 AND b < h]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))],
joinType=[inner])
- :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
- : +- LogicalProject(a=[$0], b=[$1])
- : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]])
- +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
- +- LogicalProject(d=[$0], e=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
++- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))],
joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
-+- MultipleInput(readOrder=[0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d],
build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- Calc(select=[d],
where=[(e = 1)])\n +- HashAggregate(isMerge=[true], groupBy=[d], select=[d,
Final_COUNT(count$0) AS e])\n +- [#2] Exchange(distribution=[hash[d]])\n])
- :- Exchange(distribution=[broadcast])
- : +- Calc(select=[a], where=[(b = 1)])
- : +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_COUNT(count$0) AS b])
- : +- Exchange(distribution=[hash[a]])
- : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b)
AS count$0])
- : +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
- : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
- +- Exchange(distribution=[hash[d]])
- +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS
count$0])
- +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]],
fields=[d, e, f, g, h])
+NestedLoopJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b <
h))], select=[d, e, f, g, h, a, b, c], build=[right])
+:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- Exchange(distribution=[broadcast])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -706,6 +707,75 @@ Calc(select=[c, g])
+- Exchange(distribution=[broadcast])
+- Calc(select=[e, g])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter2">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d IS NULL AND a < 12]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- NestedLoopJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e,
f], build=[right])
+ :- Calc(select=[a], where=[(a < 12)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[d, e, f], where=[(d < 12)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter3">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d < 10 AND a < 12]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[d, e, f])
++- NestedLoopJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f],
build=[left])
+ :- Exchange(distribution=[broadcast])
+ : +- Calc(select=[a], where=[(a < 10)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[d, e, f], where=[(d < 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter4">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d = null]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
]]>
</Resource>
</TestCase>
@@ -746,6 +816,30 @@ Calc(select=[a, CAST(b AS BIGINT) AS b, CAST(2 AS INTEGER)
AS d, e])
+- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS
count$0])
+- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
+- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]],
fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinWithEquiPred">
+ <Resource name="sql">
+ <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], g=[$6])
++- LogicalJoin(condition=[=($1, $4)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[c, g])
++- NestedLoopJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e,
g], build=[left])
+ :- Exchange(distribution=[broadcast])
+ : +- Calc(select=[b, c])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[e, g])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
index 8b50a7ce09f..ac56e90fd11 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.xml
@@ -137,6 +137,28 @@ Calc(select=[c, g])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c], where=[(a < 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN MyTable1 ON a = d AND d
< 2 AND b < h]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
++- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))],
joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b < h))],
select=[d, e, f, g, h, a, b, c], build=[right])
+:- Exchange(distribution=[hash[d]])
+: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- Exchange(distribution=[hash[a]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -280,43 +302,6 @@ Calc(select=[c, g])
+- Exchange(distribution=[hash[e, d]])
+- Calc(select=[d, e, g])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testInnerJoinWithJoinConditionPushDown">
- <Resource name="sql">
- <![CDATA[
-SELECT * FROM
- (select a, count(b) as b from MyTable1 group by a)
- join
- (select d, count(e) as e from MyTable2 group by d)
- on a = d and b = e and d = 2 and b = 1
-]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))],
joinType=[inner])
- :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
- : +- LogicalProject(a=[$0], b=[$1])
- : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]])
- +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
- +- LogicalProject(d=[$0], e=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
-Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
-+- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin],
where=[(a = d)], select=[a, d], build=[right])\n:- Calc(select=[a], where=[(b =
1)])\n: +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_COUNT(count$0) AS b])\n: +- [#1] Exchange(distribution=[hash[a]])\n+-
Calc(select=[d], where=[(e = 1)])\n +- HashAggregate(isMerge=[true],
groupBy=[d], select=[d, Final_COUNT(count$0) AS e])\n +- [#2]
Exchange(distribution=[hash[d]])\n])
- :- Exchange(distribution=[hash[a]])
- : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS
count$0])
- : +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
- : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
- +- Exchange(distribution=[hash[d]])
- +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS
count$0])
- +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]],
fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
@@ -473,82 +458,136 @@ Calc(select=[c, g])
]]>
</Resource>
</TestCase>
- <TestCase name="testSelfJoin">
+ <TestCase name="testRightOuterJoinWithEquiPred">
<Resource name="sql">
- <![CDATA[SELECT * FROM
- (SELECT * FROM src WHERE k = 0) src1
-LEFT OUTER JOIN
- (SELECT * from src WHERE k = 0) src2
-ON (src1.k = src2.k AND src2.k > 10)
- ]]>
+ <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(k=[$0], v=[$1], k0=[$2], v0=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), >($2, 10))], joinType=[left])
- :- LogicalProject(k=[$0], v=[$1])
- : +- LogicalFilter(condition=[=($0, 0)])
- : +- LogicalTableScan(table=[[default_catalog, default_database, src,
source: [TestTableSource(k, v)]]])
- +- LogicalProject(k=[$0], v=[$1])
- +- LogicalFilter(condition=[=($0, 0)])
- +- LogicalTableScan(table=[[default_catalog, default_database, src,
source: [TestTableSource(k, v)]]])
+LogicalProject(c=[$2], g=[$6])
++- LogicalJoin(condition=[=($1, $4)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-HashJoin(joinType=[LeftOuterJoin], where=[(k = k0)], select=[k, v, k0, v0],
build=[right])
-:- Exchange(distribution=[hash[k]])
-: +- Calc(select=[CAST(0 AS BIGINT) AS k, v], where=[(k = 0)])
-: +- LegacyTableSourceScan(table=[[default_catalog, default_database, src,
source: [TestTableSource(k, v)]]], fields=[k, v])
-+- Exchange(distribution=[hash[k]])
- +- Values(tuples=[[]], values=[k, v])
+Calc(select=[c, g])
++- HashJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e, g],
build=[right])
+ :- Exchange(distribution=[hash[b]])
+ : +- Calc(select=[b, c])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[e]])
+ +- Calc(select=[e, g])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
- <TestCase name="testLeftOuterJoinWithEquiAndNonEquiPred">
+ <TestCase name="testInnerJoinWithJoinConditionPushDown">
<Resource name="sql">
- <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN MyTable1 ON a = d AND d
< 2 AND b < h]]>
+ <![CDATA[
+SELECT * FROM
+ (select a, count(b) as b from MyTable1 group by a)
+ join
+ (select d, count(e) as e from MyTable2 group by d)
+ on a = d and b = e and d = 2 and b = 1
+]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
-+- LogicalJoin(condition=[AND(=($5, $0), <($0, 2), <($6, $4))],
joinType=[left])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))],
joinType=[inner])
+ :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+ : +- LogicalProject(a=[$0], b=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-HashJoin(joinType=[LeftOuterJoin], where=[((a = d) AND (d < 2) AND (b < h))],
select=[d, e, f, g, h, a, b, c], build=[right])
-:- Exchange(distribution=[hash[d]])
-: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-+- Exchange(distribution=[hash[a]])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
++- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin],
where=[(a = d)], select=[a, d], build=[right])\n:- Calc(select=[a], where=[(b =
1)])\n: +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_COUNT(count$0) AS b])\n: +- [#1] Exchange(distribution=[hash[a]])\n+-
Calc(select=[d], where=[(e = 1)])\n +- HashAggregate(isMerge=[true],
groupBy=[d], select=[d, Final_COUNT(count$0) AS e])\n +- [#2]
Exchange(distribution=[hash[d]])\n])
+ :- Exchange(distribution=[hash[a]])
+ : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS
count$0])
+ : +- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
+ +- Exchange(distribution=[hash[d]])
+ +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS
count$0])
+ +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]],
fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
- <TestCase name="testLeftOuterJoinWithEquiPred">
+ <TestCase name="testLeftOuterJoinWithFilter2">
<Resource name="sql">
- <![CDATA[SELECT c, g FROM MyTable1 LEFT OUTER JOIN MyTable2 ON b = e]]>
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d IS NULL AND a < 12]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalJoin(condition=[=($1, $4)], joinType=[left])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[c, g])
-+- HashJoin(joinType=[LeftOuterJoin], where=[(b = e)], select=[b, c, e, g],
build=[right])
- :- Exchange(distribution=[hash[b]])
- : +- Calc(select=[b, c])
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- HashJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e, f],
build=[left])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a], where=[(a < 12)])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[hash[e]])
- +- Calc(select=[e, g])
+ +- Exchange(distribution=[hash[d]])
+ +- Calc(select=[d, e, f], where=[(d < 12)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter3">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d < 10 AND a < 12]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[d, e, f])
++- HashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f],
build=[left])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a], where=[(a < 10)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[d]])
+ +- Calc(select=[d, e, f], where=[(d < 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter4">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d = null]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
]]>
</Resource>
</TestCase>
@@ -594,28 +633,64 @@ Calc(select=[a, CAST(b AS BIGINT) AS b, CAST(2 AS
INTEGER) AS d, e])
]]>
</Resource>
</TestCase>
- <TestCase name="testRightOuterJoinWithEquiAndLocalPred">
+ <TestCase name="testRightJoinWithJoinConditionPushDown">
<Resource name="sql">
- <![CDATA[SELECT c, g FROM MyTable2 RIGHT OUTER JOIN MyTable1 ON a = d
AND d < 2]]>
+ <![CDATA[
+SELECT * FROM
+ (select a, count(b) as b from MyTable1 group by a)
+ right join
+ (select d, count(e) as e from MyTable2 group by d)
+ on a = d and b = e and d = 2 and b = 1
+]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(c=[$7], g=[$3])
-+- LogicalJoin(condition=[AND(=($5, $0), <($0, 2))], joinType=[right])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))],
joinType=[right])
+ :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
+ : +- LogicalProject(a=[$0], b=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[c, g])
-+- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[d, g, a, c],
build=[left])
- :- Exchange(distribution=[hash[d]])
- : +- Calc(select=[d, g], where=[(d < 2)])
- : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, c])
+MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[RightOuterJoin],
where=[((a = d) AND (1 = e) AND (d = 2))], select=[a, b, d, e],
build=[left])\n:- Calc(select=[a, b], where=[(b = 1)])\n: +-
HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS
b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+-
HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS
e])\n +- [#1] Exchange(distribution=[hash[d]])\n])
+:- Exchange(distribution=[hash[d]])
+: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
+: +- Calc(select=[d, e])
+: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- Exchange(distribution=[hash[a]])
+ +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
+ +- Calc(select=[a, b])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithEquiPred">
+ <Resource name="sql">
+ <![CDATA[SELECT c, g FROM MyTable1 LEFT OUTER JOIN MyTable2 ON b = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2], g=[$6])
++- LogicalJoin(condition=[=($1, $4)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[c, g])
++- HashJoin(joinType=[LeftOuterJoin], where=[(b = e)], select=[b, c, e, g],
build=[right])
+ :- Exchange(distribution=[hash[b]])
+ : +- Calc(select=[b, c])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[e]])
+ +- Calc(select=[e, g])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
@@ -642,64 +717,60 @@ HashJoin(joinType=[RightOuterJoin], where=[((a = d) AND
(b < h))], select=[d, e,
]]>
</Resource>
</TestCase>
- <TestCase name="testRightOuterJoinWithEquiPred">
+ <TestCase name="testRightOuterJoinWithEquiAndLocalPred">
<Resource name="sql">
- <![CDATA[SELECT c, g FROM MyTable1 RIGHT OUTER JOIN MyTable2 ON b = e]]>
+ <![CDATA[SELECT c, g FROM MyTable2 RIGHT OUTER JOIN MyTable1 ON a = d
AND d < 2]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalJoin(condition=[=($1, $4)], joinType=[right])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(c=[$7], g=[$3])
++- LogicalJoin(condition=[AND(=($5, $0), <($0, 2))], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, g])
-+- HashJoin(joinType=[RightOuterJoin], where=[(b = e)], select=[b, c, e, g],
build=[right])
- :- Exchange(distribution=[hash[b]])
- : +- Calc(select=[b, c])
- : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[hash[e]])
- +- Calc(select=[e, g])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
++- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[d, g, a, c],
build=[left])
+ :- Exchange(distribution=[hash[d]])
+ : +- Calc(select=[d, g], where=[(d < 2)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, c])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
- <TestCase name="testRightJoinWithJoinConditionPushDown">
+ <TestCase name="testSelfJoin">
<Resource name="sql">
- <![CDATA[
-SELECT * FROM
- (select a, count(b) as b from MyTable1 group by a)
- right join
- (select d, count(e) as e from MyTable2 group by d)
- on a = d and b = e and d = 2 and b = 1
-]]>
+ <![CDATA[SELECT * FROM
+ (SELECT * FROM src WHERE k = 0) src1
+LEFT OUTER JOIN
+ (SELECT * from src WHERE k = 0) src2
+ON (src1.k = src2.k AND src2.k > 10)
+ ]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
-+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))],
joinType=[right])
- :- LogicalAggregate(group=[{0}], b=[COUNT($1)])
- : +- LogicalProject(a=[$0], b=[$1])
- : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]])
- +- LogicalAggregate(group=[{0}], e=[COUNT($1)])
- +- LogicalProject(d=[$0], e=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+LogicalProject(k=[$0], v=[$1], k0=[$2], v0=[$3])
++- LogicalJoin(condition=[AND(=($0, $2), >($2, 10))], joinType=[left])
+ :- LogicalProject(k=[$0], v=[$1])
+ : +- LogicalFilter(condition=[=($0, 0)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, src,
source: [TestTableSource(k, v)]]])
+ +- LogicalProject(k=[$0], v=[$1])
+ +- LogicalFilter(condition=[=($0, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src,
source: [TestTableSource(k, v)]]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[RightOuterJoin],
where=[((a = d) AND (1 = e) AND (d = 2))], select=[a, b, d, e],
build=[left])\n:- Calc(select=[a, b], where=[(b = 1)])\n: +-
HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS
b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+-
HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS
e])\n +- [#1] Exchange(distribution=[hash[d]])\n])
-:- Exchange(distribution=[hash[d]])
-: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
-: +- Calc(select=[d, e])
-: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-+- Exchange(distribution=[hash[a]])
- +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
- +- Calc(select=[a, b])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+HashJoin(joinType=[LeftOuterJoin], where=[(k = k0)], select=[k, v, k0, v0],
build=[right])
+:- Exchange(distribution=[hash[k]])
+: +- Calc(select=[CAST(0 AS BIGINT) AS k, v], where=[(k = 0)])
+: +- LegacyTableSourceScan(table=[[default_catalog, default_database, src,
source: [TestTableSource(k, v)]]], fields=[k, v])
++- Exchange(distribution=[hash[k]])
+ +- Values(tuples=[[]], values=[k, v])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
index 836d0f99a90..e552689a40d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
@@ -517,6 +517,77 @@ Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
+- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS
count$0])
+- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
+- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]],
fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter2">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d IS NULL AND a < 12]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(IS NULL($3), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[null:INTEGER AS d, e, f], where=[d IS NULL])
++- SortMergeJoin(joinType=[LeftOuterJoin], where=[(a = d)], select=[a, d, e,
f])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a], where=[(a < 12)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[d]])
+ +- Calc(select=[d, e, f], where=[(d < 12)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter3">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d < 10 AND a < 12]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(<($3, 10), <($0, 12))])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[d, e, f])
++- SortMergeJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d, e, f])
+ :- Exchange(distribution=[hash[a]])
+ : +- Calc(select=[a], where=[(a < 10)])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[d]])
+ +- Calc(select=[d, e, f], where=[(d < 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinWithFilter4">
+ <Resource name="sql">
+ <![CDATA[SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where
d = null]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($3, null)])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Values(tuples=[[]], values=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
index 3a311ca1d60..687887b399a 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRuleTest.xml
@@ -432,6 +432,194 @@ LogicalProject(b2=[$0], c2=[$1], a2=[$2], a1=[$3],
b1=[$4], c1=[$5])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+- LogicalFilter(condition=[>($1, 10)])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithFilter1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
< 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[<($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[<($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithFilter2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
<> 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<>($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[<>($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[<>($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithFilter3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
> 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[>($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[>($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithFilter4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
>= 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>=($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[>=($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[>=($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithFilter5">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
<= 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<=($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[<=($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[<=($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithFilter6">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
= null]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[=($5, null)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalValues(tuples=[[]])
+ +- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithNullFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
IS NULL]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[IS NULL($0)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[IS NULL($2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithNullFilter2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 INNER JOIN MyTable2 ON a1 = a2 WHERE a2
IS NULL AND a1 < 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(IS NULL($5), <($0, 10))])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalValues(tuples=[[]])
+ +- LogicalValues(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -711,6 +899,30 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3],
c2=[$4], a2=[$5])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalFilter(condition=[=($2, 2)])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithAllFiltersFromWhere">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON true WHERE b1 = b2
AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(=($1, $3), =($2, $4), =($5, 2), >($3, 10),
<>(COALESCE($2, $4), _UTF-16LE''))])
+ +- LogicalJoin(condition=[true], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[AND(=($1, $3), =($2, $4), <>(COALESCE($2, $4),
_UTF-16LE''))], joinType=[inner])
+ :- LogicalFilter(condition=[>($1, 10)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[AND(=($2, 2), >($0, 10))])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
</Resource>
</TestCase>
@@ -736,15 +948,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3],
c2=[$4], a2=[$5])
]]>
</Resource>
</TestCase>
- <TestCase name="testLeftJoinWithAllFiltersFromWhere">
+ <TestCase name="testLeftJoinWithFilter1">
<Resource name="sql">
- <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON true WHERE b1 = b2
AND c1 = c2 AND a2 = 2 AND b2 > 10 AND COALESCE(c1, c2) <> '' ]]>
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 <
1]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalFilter(condition=[AND(=($1, $3), =($2, $4), =($5, 2), >($3, 10),
<>(COALESCE($2, $4), _UTF-16LE''))])
- +- LogicalJoin(condition=[true], joinType=[left])
++- LogicalFilter(condition=[<($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -752,11 +964,177 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3],
c2=[$4], a2=[$5])
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4), <>(COALESCE($2, $4),
_UTF-16LE''))], joinType=[inner])
- :- LogicalFilter(condition=[>($1, 10)])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[<($0, 1)])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
- +- LogicalFilter(condition=[AND(=($2, 2), >($0, 10))])
+ +- LogicalFilter(condition=[<($2, 1)])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithFilter2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2
<> 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<>($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[<>($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[<>($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithFilter3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 >
1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[>($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[>($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithFilter4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2
>= 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[>=($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[>=($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[>=($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithFilter5">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2
<= 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[<=($5, 1)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[<=($0, 1)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[<=($2, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithFilter6">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2 =
null]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[=($5, null)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalValues(tuples=[[]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithNullFilterInRightSide">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2
IS NULL]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithNullFilterInRightSide2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a2
IS NULL AND a1 < 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(IS NULL($5), <($0, 10))])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($5)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalFilter(condition=[<($0, 10)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[<($2, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
</Resource>
</TestCase>
@@ -781,6 +1159,55 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3],
c2=[$4], a2=[$5])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+- LogicalFilter(condition=[AND(=($2, 2), >($0, 10), IS NOT NULL($1))])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinWithSomeFiltersFromLeftSide">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a1 =
2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[=($0, 2)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ :- LogicalFilter(condition=[=($0, 2)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[=($2, 2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightJoinWithNullFilterInRightSide2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE a1
IS NULL AND a2 < 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[AND(IS NULL($0), <($5, 10))])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
++- LogicalFilter(condition=[IS NULL($0)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
+ :- LogicalFilter(condition=[<($0, 10)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+ +- LogicalFilter(condition=[<($2, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
</Resource>
</TestCase>
@@ -808,15 +1235,15 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3],
c2=[$4], a2=[$5])
]]>
</Resource>
</TestCase>
- <TestCase name="testLeftJoinWithSomeFiltersFromLeftSide">
+ <TestCase name="testRightJoinWithNullFilterInLeftSide">
<Resource name="sql">
- <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON a1 = a2 WHERE a1 =
2]]>
+ <![CDATA[SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON a1 = a2 WHERE a1
IS NULL]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalFilter(condition=[=($0, 2)])
- +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
++- LogicalFilter(condition=[IS NULL($0)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -824,10 +1251,9 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3],
c2=[$4], a2=[$5])
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], c1=[$2], b2=[$3], c2=[$4], a2=[$5])
-+- LogicalJoin(condition=[=($0, $5)], joinType=[left])
- :- LogicalFilter(condition=[=($0, 2)])
- : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
- +- LogicalFilter(condition=[=($2, 2)])
++- LogicalFilter(condition=[IS NULL($0)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
index 795e55a6ed2..6f9fa8e6007 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/JoinTestBase.scala
@@ -19,8 +19,6 @@ package org.apache.flink.table.planner.plan.batch.sql.join
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
import org.junit.Test
@@ -36,6 +34,32 @@ abstract class JoinTestBase extends TableTestBase {
util.verifyExecPlan("SELECT c, g FROM MyTable1, MyTable2 WHERE foo = e")
}
+ @Test
+ def testLeftOuterJoinWithFilter2(): Unit = {
+ // For left/right join, we will only push equal filter condition into
+ // other side by derived from join condition and filter condition. So,
+ // d IS NULL cannot be push into left side.
+ util.verifyExecPlan(
+ "SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where d IS
NULL AND a < 12")
+ }
+
+ @Test
+ def testLeftOuterJoinWithFilter3(): Unit = {
+ // For left/right join, we will only push equal filter condition into
+ // other side by derived from join condition and filter condition. So,
+ // d < 10 cannot be push into left side.
+ util.verifyExecPlan(
+ "SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where d < 10
AND a < 12")
+ }
+
+ @Test
+ def testLeftOuterJoinWithFilter4(): Unit = {
+ // For left/right join, we will only push equal filter condition into
+ // other side by derived from join condition and filter condition. So,
+ // d = null cannot be push into left side.
+ util.verifyExecPlan("SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a
= d where d = null")
+ }
+
@Test(expected = classOf[TableException])
def testJoinNonMatchingKeyTypes(): Unit = {
// INTEGER and VARCHAR(65536) does not have common type now
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
index 476ca4089c1..f725d350d0d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.batch.sql.join
import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.junit.Before
+import org.junit.{Before, Test}
class NestedLoopJoinTest extends JoinTestBase {
@@ -28,4 +28,12 @@ class NestedLoopJoinTest extends JoinTestBase {
util.tableEnv.getConfig
.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"SortMergeJoin, HashJoin")
}
+
+ @Test
+ def testLeftOuterJoinWithFilter1(): Unit = {
+ // Only support for nested loop join.
+ // We will push a = 10 into left side MyTable1 by derived from a = d and d
= 10.
+ util.verifyExecPlan(
+ "SELECT d, e, f FROM MyTable1 LEFT JOIN MyTable2 ON a = d where d = 10
AND a < 12")
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
index 45278a10503..cb5ca93929f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
@@ -1143,6 +1143,114 @@ class JoinITCase(expectedJoinType: JoinType) extends
BatchTestBase {
row(6, null, 1, 6, null, 1),
row(null, null, null, 4, 1.0, 1))
)
+
+ checkResult(
+ """
+ |select * from
+ | l inner join r on a = c where c IS NULL
+ |""".stripMargin,
+ Seq()
+ )
+
+ checkResult(
+ """
+ |select * from
+ | l inner join r on a = c where c = NULL
+ |""".stripMargin,
+ Seq()
+ )
+
+ if (expectedJoinType == NestedLoopJoin) {
+ // For inner join, we will push c = 3 into left side l by
+ // derived from a = c and c = 3.
+ checkResult(
+ """
+ |select * from
+ | l inner join r on a = c where c = 3
+ |""".stripMargin,
+ Seq(
+ row(3, 3.0, 3, 2.0)
+ )
+ )
+
+ // For left join, we will push c = 3 into left side l by
+ // derived from a = c and c = 3.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c = 3
+ |""".stripMargin,
+ Seq(
+ row(3, 3.0, 3, 2.0)
+ )
+ )
+ }
+
+ // For left/right join, we will only push equal filter condition into
+ // other side by derived from join condition and filter condition. So,
+ // c IS NULL cannot be push into left side.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c IS NULL
+ |""".stripMargin,
+ Seq(
+ row(1, 2.0, null, null),
+ row(1, 2.0, null, null),
+ row(null, 5.0, null, null),
+ row(null, null, null, null)
+ )
+ )
+
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c IS NULL AND a <= 1
+ |""".stripMargin,
+ Seq(
+ row(1, 2.0, null, null),
+ row(1, 2.0, null, null)
+ )
+ )
+
+ // For 'c = NULL', all data cannot match this condition.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c = NULL
+ |""".stripMargin,
+ Seq()
+ )
+
+ // For left/right join, we will only push equal filter condition into
+ // other side by derived from join condition and filter condition. So,
+ // c < 3 cannot be push into left side.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c < 3 AND a <= 3
+ |""".stripMargin,
+ Seq(
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0)
+ )
+ )
+
+ // C <> 3 cannot be push into left side.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c <> 3 AND a <= 3
+ |""".stripMargin,
+ Seq(
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0)
+ )
+ )
}
@Test
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 86863160348..ac59bc2623f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -1501,6 +1501,75 @@ class JoinITCase(state: StateBackendMode) extends
StreamingWithStateTestBase(sta
row(6, null, 1, 6, null, 1),
row(null, null, null, 4, 1.0, 1))
)
+
+ // For left join, we will push c = 3 into left side l by
+ // derived from a = c and c = 3.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c = 3
+ |""".stripMargin,
+ Seq(
+ row(3, 3.0, 3, 2.0)
+ )
+ )
+
+ // For left/right join, we will only push equal filter condition into
+ // other side by derived from join condition and filter condition. So,
+ // c IS NULL cannot be push into left side.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c IS NULL
+ |""".stripMargin,
+ Seq(
+ row(1, 2.0, null, null),
+ row(1, 2.0, null, null),
+ row(null, 5.0, null, null),
+ row(null, null, null, null)
+ )
+ )
+
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c IS NULL AND a <= 1
+ |""".stripMargin,
+ Seq(
+ row(1, 2.0, null, null),
+ row(1, 2.0, null, null)
+ )
+ )
+
+ // For left/right join, we will only push equal filter condition into
+ // other side by derived from join condition and filter condition. So,
+ // c < 3 cannot be push into left side.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c < 3 AND a <= 3
+ |""".stripMargin,
+ Seq(
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0)
+ )
+ )
+
+ // C <> 3 cannot be push into left side.
+ checkResult(
+ """
+ |select * from
+ | l left join r on a = c where c <> 3 AND a <= 3
+ |""".stripMargin,
+ Seq(
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0),
+ row(2, 1.0, 2, 3.0)
+ )
+ )
}
@Test