This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f366f77  [FLINK-24139][table-planner] Push down more predicates 
through Join in stream mode
f366f77 is described below

commit f366f779efd586470b2668be8736ca992db3054c
Author: xuyang <[email protected]>
AuthorDate: Tue Sep 14 10:01:21 2021 +0800

    [FLINK-24139][table-planner] Push down more predicates through Join in 
stream mode
    
    This closes #17272
---
 .../plan/optimize/program/FlinkStreamProgram.scala |  19 +++-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   6 ++
 .../planner/plan/stream/sql/join/JoinTest.xml      |  74 ++++++++++++++
 .../table/planner/plan/stream/table/JoinTest.xml   | 105 ++++++++++++++++----
 .../planner/plan/stream/sql/join/JoinTest.scala    |  15 +++
 .../table/planner/plan/stream/table/JoinTest.scala |  42 ++++++++
 .../planner/runtime/stream/sql/JoinITCase.scala    |  81 ++++++++++++++++
 .../planner/runtime/stream/table/JoinITCase.scala  | 107 +++++++++++++++++++++
 8 files changed, 425 insertions(+), 24 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
index abc6527..9287f1d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala
@@ -121,11 +121,20 @@ object FlinkStreamProgram {
       PREDICATE_PUSHDOWN,
       FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
         .addProgram(
-          FlinkHepRuleSetProgramBuilder.newBuilder
-            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
-            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
-            .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES)
-            .build(), "filter rules")
+          FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
+            .addProgram(
+              FlinkHepRuleSetProgramBuilder.newBuilder[StreamOptimizeContext]
+                
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                .add(FlinkStreamRuleSets.JOIN_PREDICATE_REWRITE_RULES)
+                .build(), "join predicate rewrite")
+            .addProgram(
+              FlinkHepRuleSetProgramBuilder.newBuilder
+                
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES)
+                .build(), "filter rules")
+            .setIterations(5).build(), "predicate rewrite")
         .addProgram(
           FlinkHepRuleSetProgramBuilder.newBuilder
             .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index da81412..e2f765b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -160,6 +160,12 @@ object FlinkStreamRuleSets {
   )
 
   /**
+   * RuleSet to extract sub-condition which can be pushed into join inputs
+   */
+  val JOIN_PREDICATE_REWRITE_RULES: RuleSet = RuleSets.ofList(
+    RuleSets.ofList(JoinDependentConditionDerivationRule.INSTANCE))
+
+  /**
     * RuleSet to do predicate pushdown
     */
   val FILTER_PREPARE_RULES: RuleSet = RuleSets.ofList((
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index 12dd065..fefb11d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -16,6 +16,80 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testDependentConditionDerivationInnerJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR (a2 = 2 
AND b2 = 2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$3])
++- LogicalJoin(condition=[OR(AND(=($0, 1), =($3, 1)), AND(=($1, 2), =($4, 
2)))], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, A, source: 
[TestTableSource(a1, a2, a3)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, B, source: 
[TestTableSource(b1, b2, b3)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- Join(joinType=[InnerJoin], where=[(((a1 = 1) AND (b1 = 1)) OR ((a2 = 
2:BIGINT) AND (b2 = 2:BIGINT)))], select=[a1, a2, b1, b2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[single])
+   :  +- Calc(select=[a1, a2], where=[((a1 = 1) OR (a2 = 2:BIGINT))])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[b1, b2], where=[((b1 = 1) OR (b2 = 2:BIGINT))])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDependentConditionDerivationInnerJoinWithNull">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t JOIN s ON (a = 1 AND x = 1) OR (a = 2 AND y is 
null)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4], z=[$5])
++- LogicalJoin(condition=[OR(AND(=($0, 1), =($3, 1)), AND(=($0, 2), IS 
NULL($4)))], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, s, source: 
[TestTableSource(x, y, z)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Join(joinType=[InnerJoin], where=[(((a = 1) AND (x = 1:BIGINT)) OR ((a = 2) 
AND y IS NULL))], select=[a, b, c, x, y, z], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[single])
+:  +- Calc(select=[a, b, c], where=[SEARCH(a, Sarg[1, 2])])
+:     +- LegacyTableSourceScan(table=[[default_catalog, default_database, t, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[single])
+   +- Calc(select=[x, y, z], where=[(y IS NULL OR (x = 1:BIGINT))])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, s, 
source: [TestTableSource(x, y, z)]]], fields=[x, y, z])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDependentConditionDerivationInnerJoinWithTrue">
+    <Resource name="sql">
+      <![CDATA[SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR (a2 = 2 
AND true)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$3])
++- LogicalJoin(condition=[OR(AND(=($0, 1), =($3, 1)), SEARCH($1, 
Sarg[2L:BIGINT]:BIGINT))], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, A, source: 
[TestTableSource(a1, a2, a3)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, B, source: 
[TestTableSource(b1, b2, b3)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- Join(joinType=[InnerJoin], where=[(((a1 = 1) AND (b1 = 1)) OR SEARCH(a2, 
Sarg[2L:BIGINT]:BIGINT))], select=[a1, a2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[single])
+   :  +- Calc(select=[a1, a2], where=[((a1 = 1) OR SEARCH(a2, 
Sarg[2L:BIGINT]:BIGINT))])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[b1])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testFullJoin">
     <Resource name="sql">
       <![CDATA[SELECT a1, b1 FROM A FULL JOIN B ON a1 = b1]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
index 9767b01..5e18cc0 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml
@@ -16,6 +16,92 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testDependentConditionDerivationInnerJoin">
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($3, 1)), AND(=($1, 2), =($3, 
5)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Join(joinType=[InnerJoin], where=[(((a = 1) AND (d = 1)) OR ((b = 2) AND (d = 
5)))], select=[a, b, d, e], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[single])
+:  +- Calc(select=[a, b], where=[((a = 1) OR (b = 2))])
+:     +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[a, b, c])
++- Exchange(distribution=[single])
+   +- Calc(select=[d, e], where=[SEARCH(d, Sarg[1, 5])])
+      +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRowTimeInnerJoinWithTimeAccessed">
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[AND(=($0, $4), >=($3, -($7, 300000:INTERVAL DAY TO 
SECOND)), <($3, $7), >($3, $6))])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3], 
where=[((a = d) AND (lrtime >= (rrtime - 300000:INTERVAL DAY TO SECOND)) AND 
(lrtime < rrtime) AND (lrtime > f))], select=[a, b, c, lrtime, d, e, f, rrtime])
+:- Exchange(distribution=[hash[a]])
+:  +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[a, b, c, lrtime])
++- Exchange(distribution=[hash[d]])
+   +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[d, e, f, rrtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDependentConditionDerivationInnerJoinWithNull">
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 0), =($3, 3)), AND(=($0, 1), IS 
NULL($5)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, d, e])
++- Join(joinType=[InnerJoin], where=[(((a = 0) AND (d = 3)) OR ((a = 1) AND f 
IS NULL))], select=[a, b, d, e, f], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[single])
+   :  +- Calc(select=[a, b], where=[SEARCH(a, Sarg[0, 1])])
+   :     +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[a, b, c])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[d, e, f], where=[(f IS NULL OR (d = 3))])
+         +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDependentConditionDerivationInnerJoinWithTrue">
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 0), =($3, 3)), SEARCH($0, Sarg[1]))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Join(joinType=[InnerJoin], where=[(((a = 0) AND (d = 3)) OR SEARCH(a, 
Sarg[1]))], select=[a, b, d, e], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[single])
+:  +- Calc(select=[a, b], where=[SEARCH(a, Sarg[0, 1])])
+:     +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[a, b, c])
++- Exchange(distribution=[single])
+   +- Calc(select=[d, e])
+      +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testLeftOuterJoinEquiAndLocalPred">
     <Resource name="ast">
       <![CDATA[
@@ -59,25 +145,6 @@ Calc(select=[b, y])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRowTimeInnerJoinWithTimeAccessed">
-    <Resource name="ast">
-      <![CDATA[
-LogicalFilter(condition=[AND(=($0, $4), >=($3, -($7, 300000:INTERVAL DAY TO 
SECOND)), <($3, $7), >($3, $6))])
-+- LogicalJoin(condition=[true], joinType=[inner])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-300000, leftUpperBound=-1, leftTimeIndex=3, rightTimeIndex=3], 
where=[((a = d) AND (lrtime >= (rrtime - 300000:INTERVAL DAY TO SECOND)) AND 
(lrtime < rrtime) AND (lrtime > f))], select=[a, b, c, lrtime, d, e, f, rrtime])
-:- Exchange(distribution=[hash[a]])
-:  +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[a, b, c, lrtime])
-+- Exchange(distribution=[hash[d]])
-   +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[d, e, f, rrtime])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testLeftOuterJoinEquiPred">
     <Resource name="ast">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index fd70df2..d6c41ab 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -33,6 +33,21 @@ class JoinTest extends TableTestBase {
   util.addTableSource[(Long, String, Int)]("s", 'x, 'y, 'z)
 
   @Test
+  def testDependentConditionDerivationInnerJoin: Unit = {
+    util.verifyExecPlan("SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR 
(a2 = 2 AND b2 = 2)")
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithTrue: Unit = {
+    util.verifyExecPlan("SELECT a1, b1 FROM A JOIN B ON (a1 = 1 AND b1 = 1) OR 
(a2 = 2 AND true)")
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithNull: Unit = {
+    util.verifyExecPlan("SELECT * FROM t JOIN s ON (a = 1 AND x = 1) OR (a = 2 
AND y is null)")
+  }
+
+  @Test
   def testInnerJoin(): Unit = {
     util.verifyExecPlan("SELECT a1, b1 FROM A JOIN B ON a1 = b1")
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
index 4769573..e5637da 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/JoinTest.scala
@@ -31,6 +31,48 @@ import java.sql.Timestamp
   */
 class JoinTest extends TableTestBase {
 
+  @Test
+  def testDependentConditionDerivationInnerJoin: Unit = {
+    val util = streamTestUtil()
+    val left = util.addDataStream[(Long, Int, String)]("T1", 'a, 'b, 'c)
+
+    val right = util.addDataStream[(Long, Int, String)]("T2",'d, 'e, 'f)
+
+    val resultTable = left.join(right)
+      .where(('a === 1 && 'd === 1) || ('b === 2 && 'd === 5))
+      .select('a,'b,'d,'e)
+
+    util.verifyExecPlan(resultTable)
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithTrue: Unit = {
+    val util = streamTestUtil()
+    val left = util.addDataStream[(Long, Int, String)]("T1", 'a, 'b, 'c)
+
+    val right = util.addDataStream[(Long, Int, String)]("T2",'d, 'e, 'f)
+
+    val resultTable = left.join(right)
+      .where(('a === 0 && 'd === 3) || ('a === 1 && true))
+      .select('a,'b,'d,'e)
+
+    util.verifyExecPlan(resultTable)
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithNull: Unit = {
+    val util = streamTestUtil()
+    val left = util.addDataStream[(Long, Int, String)]("T1", 'a, 'b, 'c)
+
+    val right = util.addDataStream[(Long, Int, String)]("T2",'d, 'e, 'f)
+
+    val resultTable = left.join(right)
+      .where(('a === 0 && 'd === 3) || ('a === 1 && 'f.isNull))
+      .select('a,'b,'d,'e)
+
+    util.verifyExecPlan(resultTable)
+  }
+
   // Tests for inner join
   @Test
   def testRowTimeWindowInnerJoin(): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 1a5790e..bcf483d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -73,6 +73,87 @@ class JoinITCase(state: StateBackendMode) extends 
StreamingWithStateTestBase(sta
   // Tests for inner join.
   override def after(): Unit = {}
 
+  @Test
+  def testDependentConditionDerivationInnerJoin(): Unit = {
+    val sqlQuery = "SELECT * FROM A, B WHERE (a2 = 1 and b2 = 2) or (a1 = 2 
and b1 = 4)"
+
+    val sink = new TestingRetractSink
+    
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.Seq(
+      "1,1,Hi,2,2,1,Hallo Welt,2",
+      "2,2,Hello,4,10,9,FGH,2",
+      "2,2,Hello,4,7,6,CDE,2",
+      "2,2,Hello,4,8,7,DEF,1",
+      "2,2,Hello,4,9,8,EFG,1")
+
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithTrue(): Unit = {
+    val sqlQuery = "SELECT * FROM A, B WHERE (a2 = 1 AND true) OR (a1 = 2 AND 
b1 = 4) "
+
+    val sink = new TestingRetractSink
+    
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1,Hi,1,1,0,Hallo,1" ,
+      "1,1,Hi,2,2,1,Hallo Welt,2",
+      "1,1,Hi,2,3,2,Hallo Welt wie,1",
+      "1,1,Hi,3,4,3,Hallo Welt wie gehts?,2",
+      "1,1,Hi,3,5,4,ABC,2",
+      "1,1,Hi,3,6,5,BCD,3",
+      "1,1,Hi,4,10,9,FGH,2",
+      "1,1,Hi,4,7,6,CDE,2",
+      "1,1,Hi,4,8,7,DEF,1",
+      "1,1,Hi,4,9,8,EFG,1",
+      "1,1,Hi,5,11,10,GHI,1",
+      "1,1,Hi,5,12,11,HIJ,3",
+      "1,1,Hi,5,13,12,IJK,3",
+      "1,1,Hi,5,14,13,JKL,2",
+      "1,1,Hi,5,15,14,KLM,2",
+      "2,2,Hello,4,10,9,FGH,2",
+      "2,2,Hello,4,7,6,CDE,2",
+      "2,2,Hello,4,8,7,DEF,1",
+      "2,2,Hello,4,9,8,EFG,1").toList
+
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithNull(): Unit = {
+    val data1 = List(
+      (0, 1, "hi a1"),
+      (1, 2, "hi a2"),
+      (2, 3, "hi a3")
+    )
+
+    val data2 = List(
+      (3, 4, "hi b1"),
+      (4, 5, null),
+      (5, 6, "hi b3")
+    )
+
+    val table1 = failingDataSource(data1).toTable(tEnv,'a1, 'a2, 'a3)
+    val table2 = failingDataSource(data2).toTable(tEnv,'b1, 'b2, 'b3)
+    tEnv.registerTable("a",table1)
+    tEnv.registerTable("b",table2)
+
+    val sqlQuery = "SELECT * FROM a, b WHERE (a1 = 1 AND b1 = 3) OR (a1 = 2 
AND b3 is null) "
+
+    val sink = new TestingRetractSink
+    
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,hi a2,3,4,hi b1" ,
+      "2,3,hi a3,4,5,null").toList
+
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
 
   /** test non-window inner join **/
   @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index 958d6f8..1be50c3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -86,6 +86,113 @@ class JoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
   }
 
   @Test
+  def testDependentConditionDerivationInnerJoin(): Unit = {
+    val data1 = List(
+      (0, 1),
+      (1, 2),
+      (2, 3)
+    )
+
+    val data2 = List(
+      (3, 4),
+      (4, 5),
+      (5, 6)
+    )
+
+    val leftTable = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
+    val rightTable = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
+
+    val joinedTable = leftTable.join(rightTable)
+      .where(('a1 === 0 && 'b1 === 3) || ('a1 === 1 && 'b2 === 5))
+      .select('a1, 'a2, 'b1, 'b2)
+
+
+    val sink = new TestingAppendSink
+    joinedTable.toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "0,1,3,4",
+        "1,2,4,5"
+    )
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithTrue(): Unit = {
+    val data1 = List(
+      (0, 1),
+      (1, 2),
+      (2, 3)
+    )
+
+    val data2 = List(
+      (3, 4),
+      (4, 5),
+      (5, 6)
+    )
+
+    val leftTable = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
+    val rightTable = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
+
+    val joinedTable = leftTable.join(rightTable)
+      .where(('a1 === 0 && 'b1 === 3) || ('a1 === 1 && true))
+      .select('a1, 'a2, 'b1, 'b2)
+
+
+    val sink = new TestingAppendSink
+    joinedTable.toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "0,1,3,4",
+      "1,2,3,4",
+      "1,2,4,5",
+      "1,2,5,6"
+    )
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testDependentConditionDerivationInnerJoinWithNull(): Unit = {
+    val data1 = List(
+      (0, 1, "hi a1"),
+      (1, 2, "hi a2"),
+      (2, 3, "hi a3")
+    )
+
+    val data2 = List(
+      (3, 4, "hi b1"),
+      (4, 5, null),
+      (5, 6, "hi b3")
+    )
+
+    val leftTable = failingDataSource(data1).toTable(tEnv, 'a1, 'a2, 'a3)
+    val rightTable = failingDataSource(data2).toTable(tEnv, 'b1, 'b2, 'b3)
+
+    val joinedTable = leftTable.join(rightTable)
+      .where(('a1 === 0 && 'b1 === 3) || ('a1 === 1 && 'b3.isNull))
+      .select('a1, 'a2, 'a3, 'b1, 'b2, 'b3)
+
+
+    val sink = new TestingAppendSink
+    joinedTable.toAppendStream[Row].addSink(sink)
+
+    env.execute()
+
+    val expected = Seq(
+      "0,1,hi a1,3,4,hi b1",
+      "1,2,hi a2,4,5,null"
+    )
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
   def testInnerJoinOutputWithPk(): Unit = {
     // data input
     val data1 = List(

Reply via email to