This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f366f77 [FLINK-24139][table-planner] Push down more predicates
through Join in stream mode
f366f77 is described below
commit f366f779efd586470b2668be8736ca992db3054c
Author: xuyang <[email protected]>
AuthorDate: Tue Sep 14 10:01:21 2021 +0800
[FLINK-24139][table-planner] Push down more predicates through Join in
stream mode
This closes #17272
---
.../plan/optimize/program/FlinkStreamProgram.scala | 19 +++-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 6 ++
.../planner/plan/stream/sql/join/JoinTest.xml | 74 ++++++++++++++
.../table/planner/plan/stream/table/JoinTest.xml | 105 ++++++++++++++++----
.../planner/plan/stream/sql/join/JoinTest.scala | 15 +++
.../table/planner/plan/stream/table/JoinTest.scala | 42 ++++++++
.../planner/runtime/stream/sql/JoinITCase.scala | 81 ++++++++++++++++
.../planner/runtime/stream/table/JoinITCase.scala | 107 +++++++++++++++++++++
8 files changed, 425 insertions(+), 24 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
index abc6527..9287f1d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
@@ -121,11 +121,20 @@ object FlinkStreamProgram {
PREDICATE_PUSHDOWN,
FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
.addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES)
- .build(), "filter rules")
+ FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder[StreamOptimizeContext]
+
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkStreamRuleSets.JOIN_PREDICATE_REWRITE_RULES)
+ .build(), "join predicate rewrite")
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder
+
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES)
+ .build(), "filter rules")
+ .setIterations(5).build(), "predicate rewrite")
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index da81412..e2f765b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -160,6 +160,12 @@ object FlinkStreamRuleSets {
)
/**
+ * RuleSet to extract sub-condition which can be pushed into join inputs
+ */
+ val JOIN_PREDICATE_REWRITE_RULES: RuleSet = RuleSets.ofList(
+ RuleSets.ofList(JoinDependentConditionDerivationRule.INSTANCE))
+
+ /**
* RuleSet to do predicate pushdown
*/
val FILTER_PREPARE_RULES: RuleSet = RuleSets.ofList((
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index 12dd065..fefb11d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -16,6 +16,80 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testDependentConditionDerivationInnerJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR (a2 = 2
AND b2 = 2)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$3])
++- LogicalJoin(condition=[OR(AND(=($0, 1), =($3, 1)), AND(=($1, 2), =($4,
2)))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, A, source:
[TestTableSource(a1, a2, a3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, B, source:
[TestTableSource(b1, b2, b3)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1])
++- Join(joinType=[InnerJoin], where=[(((a1 = 1) AND (b1 = 1)) OR ((a2 =
2:BIGINT) AND (b2 = 2:BIGINT)))], select=[a1, a2, b1, b2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a1, a2], where=[((a1 = 1) OR (a2 = 2:BIGINT))])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[b1, b2], where=[((b1 = 1) OR (b2 = 2:BIGINT))])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDependentConditionDerivationInnerJoinWithNull">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM t JOIN s ON (a = 1 AND x = 1) OR (a = 2 AND y is
null)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4], z=[$5])
++- LogicalJoin(condition=[OR(AND(=($0, 1), =($3, 1)), AND(=($0, 2), IS
NULL($4)))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, t, source:
[TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, s, source:
[TestTableSource(x, y, z)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Join(joinType=[InnerJoin], where=[(((a = 1) AND (x = 1:BIGINT)) OR ((a = 2)
AND y IS NULL))], select=[a, b, c, x, y, z], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[single])
+: +- Calc(select=[a, b, c], where=[SEARCH(a, Sarg[1, 2])])
+: +- LegacyTableSourceScan(table=[[default_catalog, default_database, t,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[single])
+ +- Calc(select=[x, y, z], where=[(y IS NULL OR (x = 1:BIGINT))])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, s,
source: [TestTableSource(x, y, z)]]], fields=[x, y, z])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDependentConditionDerivationInnerJoinWithTrue">
+ <Resource name="sql">
+ <![CDATA[SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR (a2 = 2
AND true)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$3])
++- LogicalJoin(condition=[OR(AND(=($0, 1), =($3, 1)), SEARCH($1,
Sarg[2L:BIGINT]:BIGINT))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, A, source:
[TestTableSource(a1, a2, a3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, B, source:
[TestTableSource(b1, b2, b3)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1])
++- Join(joinType=[InnerJoin], where=[(((a1 = 1) AND (b1 = 1)) OR SEARCH(a2,
Sarg[2L:BIGINT]:BIGINT))], select=[a1, a2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a1, a2], where=[((a1 = 1) OR SEARCH(a2,
Sarg[2L:BIGINT]:BIGINT))])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[b1])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testFullJoin">
<Resource name="sql">
<![CDATA[SELECT a1, b1 FROM A FULL JOIN B ON a1 = b1]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
index 9767b01..5e18cc0 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
@@ -16,6 +16,92 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testDependentConditionDerivationInnerJoin">
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($3, 1)), AND(=($1, 2), =($3,
5)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Join(joinType=[InnerJoin], where=[(((a = 1) AND (d = 1)) OR ((b = 2) AND (d =
5)))], select=[a, b, d, e], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[single])
+: +- Calc(select=[a, b], where=[((a = 1) OR (b = 2))])
+: +- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[a, b, c])
++- Exchange(distribution=[single])
+ +- Calc(select=[d, e], where=[SEARCH(d, Sarg[1, 5])])
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[d, e, f])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRowTimeInnerJoinWithTimeAccessed">
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[AND(=($0, $4), >=($3, -($7, 300000:INTERVAL DAY TO
SECOND)), <($3, $7), >($3, $6))])
++- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3],
where=[((a = d) AND (lrtime >= (rrtime - 300000:INTERVAL DAY TO SECOND)) AND
(lrtime < rrtime) AND (lrtime > f))], select=[a, b, c, lrtime, d, e, f, rrtime])
+:- Exchange(distribution=[hash[a]])
+: +- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[a, b, c, lrtime])
++- Exchange(distribution=[hash[d]])
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[d, e, f, rrtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDependentConditionDerivationInnerJoinWithNull">
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 0), =($3, 3)), AND(=($0, 1), IS
NULL($5)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b, d, e])
++- Join(joinType=[InnerJoin], where=[(((a = 0) AND (d = 3)) OR ((a = 1) AND f
IS NULL))], select=[a, b, d, e, f], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a, b], where=[SEARCH(a, Sarg[0, 1])])
+ : +- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[a, b, c])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[d, e, f], where=[(f IS NULL OR (d = 3))])
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[d, e, f])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDependentConditionDerivationInnerJoinWithTrue">
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 0), =($3, 3)), SEARCH($0, Sarg[1]))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Join(joinType=[InnerJoin], where=[(((a = 0) AND (d = 3)) OR SEARCH(a,
Sarg[1]))], select=[a, b, d, e], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[single])
+: +- Calc(select=[a, b], where=[SEARCH(a, Sarg[0, 1])])
+: +- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[a, b, c])
++- Exchange(distribution=[single])
+ +- Calc(select=[d, e])
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[d, e, f])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testLeftOuterJoinEquiAndLocalPred">
<Resource name="ast">
<![CDATA[
@@ -59,25 +145,6 @@ Calc(select=[b, y])
]]>
</Resource>
</TestCase>
- <TestCase name="testRowTimeInnerJoinWithTimeAccessed">
- <Resource name="ast">
- <![CDATA[
-LogicalFilter(condition=[AND(=($0, $4), >=($3, -($7, 300000:INTERVAL DAY TO
SECOND)), <($3, $7), >($3, $6))])
-+- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
- +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
-IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3],
where=[((a = d) AND (lrtime >= (rrtime - 300000:INTERVAL DAY TO SECOND)) AND
(lrtime < rrtime) AND (lrtime > f))], select=[a, b, c, lrtime, d, e, f, rrtime])
-:- Exchange(distribution=[hash[a]])
-: +- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[a, b, c, lrtime])
-+- Exchange(distribution=[hash[d]])
- +- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[d, e, f, rrtime])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testLeftOuterJoinEquiPred">
<Resource name="ast">
<
@Test
+ def testDependentConditionDerivationInnerJoin: Unit = {
+ util.verifyExecPlan("SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR
(a2 = 2 AND b2 = 2)")
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithTrue: Unit = {
+ util.verifyExecPlan("SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR
(a2 = 2 AND true)")
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithNull: Unit = {
+ util.verifyExecPlan("SELECT * FROM t JOIN s ON (a = 1 AND x = 1) OR (a = 2
AND y is null)")
+ }
+
+ @Test
def testInnerJoin(): Unit = {
util.verifyExecPlan("SELECT a1, b1 FROM A JOIN B ON a1 = b1")
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
index 4769573..e5637da 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
@@ -31,6 +31,48 @@ import java.sql.Timestamp
*/
class JoinTest extends TableTestBase {
+ @Test
+ def testDependentConditionDerivationInnerJoin: Unit = {
+ val util = streamTestUtil()
+ val left = util.addDataStream[(Long, Int, String)]("T1", 'a, 'b, 'c)
+
+ val right = util.addDataStream[(Long, Int, String)]("T2",'d, 'e, 'f)
+
+ val resultTable = left.join(right)
+ .where(('a === 1 && 'd === 1) || ('b === 2 && 'd === 5))
+ .select('a,'b,'d,'e)
+
+ util.verifyExecPlan(resultTable)
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithTrue: Unit = {
+ val util = streamTestUtil()
+ val left = util.addDataStream[(Long, Int, String)]("T1", 'a, 'b, 'c)
+
+ val right = util.addDataStream[(Long, Int, String)]("T2",'d, 'e, 'f)
+
+ val resultTable = left.join(right)
+ .where(('a === 0 && 'd === 3) || ('a === 1 && true))
+ .select('a,'b,'d,'e)
+
+ util.verifyExecPlan(resultTable)
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithNull: Unit = {
+ val util = streamTestUtil()
+ val left = util.addDataStream[(Long, Int, String)]("T1", 'a, 'b, 'c)
+
+ val right = util.addDataStream[(Long, Int, String)]("T2",'d, 'e, 'f)
+
+ val resultTable = left.join(right)
+ .where(('a === 0 && 'd === 3) || ('a === 1 && 'f.isNull))
+ .select('a,'b,'d,'e)
+
+ util.verifyExecPlan(resultTable)
+ }
+
// Tests for inner join
@Test
def testRowTimeWindowInnerJoin(): Unit = {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 1a5790e..bcf483d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -73,6 +73,87 @@ class JoinITCase(state: StateBackendMode) extends
StreamingWithStateTestBase(sta
// Tests for inner join.
override def after(): Unit = {}
+ @Test
+ def testDependentConditionDerivationInnerJoin(): Unit = {
+ val sqlQuery = "SELECT * FROM A, B WHERE (a2 = 1 and b2 = 2) or (a1 = 2
and b1 = 4)"
+
+ val sink = new TestingRetractSink
+
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = mutable.Seq(
+ "1,1,Hi,2,2,1,Hallo Welt,2",
+ "2,2,Hello,4,10,9,FGH,2",
+ "2,2,Hello,4,7,6,CDE,2",
+ "2,2,Hello,4,8,7,DEF,1",
+ "2,2,Hello,4,9,8,EFG,1")
+
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithTrue(): Unit = {
+ val sqlQuery = "SELECT * FROM A, B WHERE (a2 = 1 AND true) OR (a1 = 2 AND
b1 = 4) "
+
+ val sink = new TestingRetractSink
+
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi,1,1,0,Hallo,1" ,
+ "1,1,Hi,2,2,1,Hallo Welt,2",
+ "1,1,Hi,2,3,2,Hallo Welt wie,1",
+ "1,1,Hi,3,4,3,Hallo Welt wie gehts?,2",
+ "1,1,Hi,3,5,4,ABC,2",
+ "1,1,Hi,3,6,5,BCD,3",
+ "1,1,Hi,4,10,9,FGH,2",
+ "1,1,Hi,4,7,6,CDE,2",
+ "1,1,Hi,4,8,7,DEF,1",
+ "1,1,Hi,4,9,8,EFG,1",
+ "1,1,Hi,5,11,10,GHI,1",
+ "1,1,Hi,5,12,11,HIJ,3",
+ "1,1,Hi,5,13,12,IJK,3",
+ "1,1,Hi,5,14,13,JKL,2",
+ "1,1,Hi,5,15,14,KLM,2",
+ "2,2,Hello,4,10,9,FGH,2",
+ "2,2,Hello,4,7,6,CDE,2",
+ "2,2,Hello,4,8,7,DEF,1",
+ "2,2,Hello,4,9,8,EFG,1").toList
+
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithNull(): Unit = {
+ val data1 = List(
+ (0, 1, "hi a1"),
+ (1, 2, "hi a2"),
+ (2, 3, "hi a3")
+ )
+
+ val data2 = List(
+ (3, 4, "hi b1"),
+ (4, 5, null),
+ (5, 6, "hi b3")
+ )
+
+ val table1 = failingDataSource(data1).toTable(tEnv,'a1, 'a2, 'a3)
+ val table2 = failingDataSource(data2).toTable(tEnv,'b1, 'b2, 'b3)
+ tEnv.registerTable("a",table1)
+ tEnv.registerTable("b",table2)
+
+ val sqlQuery = "SELECT * FROM a, b WHERE (a1 = 1 AND b1 = 3) OR (a1 = 2
AND b3 is null) "
+
+ val sink = new TestingRetractSink
+
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,hi a2,3,4,hi b1" ,
+ "2,3,hi a3,4,5,null").toList
+
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
/** test non-window inner join **/
@Test
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index 958d6f8..1be50c3 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -86,6 +86,113 @@ class JoinITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase(mode
}
@Test
+ def testDependentConditionDerivationInnerJoin(): Unit = {
+ val data1 = List(
+ (0, 1),
+ (1, 2),
+ (2, 3)
+ )
+
+ val data2 = List(
+ (3, 4),
+ (4, 5),
+ (5, 6)
+ )
+
+ val leftTable = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
+ val rightTable = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
+
+ val joinedTable = leftTable.join(rightTable)
+ .where(('a1 === 0 && 'b1 === 3) || ('a1 === 1 && 'b2 === 5))
+ .select('a1, 'a2, 'b1, 'b2)
+
+
+ val sink = new TestingAppendSink
+ joinedTable.toAppendStream[Row].addSink(sink)
+
+ env.execute()
+
+ val expected = Seq(
+ "0,1,3,4",
+ "1,2,4,5"
+ )
+
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithTrue(): Unit = {
+ val data1 = List(
+ (0, 1),
+ (1, 2),
+ (2, 3)
+ )
+
+ val data2 = List(
+ (3, 4),
+ (4, 5),
+ (5, 6)
+ )
+
+ val leftTable = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
+ val rightTable = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
+
+ val joinedTable = leftTable.join(rightTable)
+ .where(('a1 === 0 && 'b1 === 3) || ('a1 === 1 && true))
+ .select('a1, 'a2, 'b1, 'b2)
+
+
+ val sink = new TestingAppendSink
+ joinedTable.toAppendStream[Row].addSink(sink)
+
+ env.execute()
+
+ val expected = Seq(
+ "0,1,3,4",
+ "1,2,3,4",
+ "1,2,4,5",
+ "1,2,5,6"
+ )
+
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testDependentConditionDerivationInnerJoinWithNull(): Unit = {
+ val data1 = List(
+ (0, 1, "hi a1"),
+ (1, 2, "hi a2"),
+ (2, 3, "hi a3")
+ )
+
+ val data2 = List(
+ (3, 4, "hi b1"),
+ (4, 5, null),
+ (5, 6, "hi b3")
+ )
+
+ val leftTable = failingDataSource(data1).toTable(tEnv, 'a1, 'a2, 'a3)
+ val rightTable = failingDataSource(data2).toTable(tEnv, 'b1, 'b2, 'b3)
+
+ val joinedTable = leftTable.join(rightTable)
+ .where(('a1 === 0 && 'b1 === 3) || ('a1 === 1 && 'b3.isNull))
+ .select('a1, 'a2, 'a3, 'b1, 'b2, 'b3)
+
+
+ val sink = new TestingAppendSink
+ joinedTable.toAppendStream[Row].addSink(sink)
+
+ env.execute()
+
+ val expected = Seq(
+ "0,1,hi a1,3,4,hi b1",
+ "1,2,hi a2,4,5,null"
+ )
+
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
def testInnerJoinOutputWithPk(): Unit = {
// data input
val data1 = List(