This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 091c7fa [FLINK-15408][table-planner-blink] Interval join supports
non-equal condition
091c7fa is described below
commit 091c7fafbc5e42ef76757b51144d7a0e273a7f29
Author: wangxlong <[email protected]>
AuthorDate: Tue Nov 3 16:32:02 2020 +0800
[FLINK-15408][table-planner-blink] Interval join supports non-equal
condition
---
.../physical/stream/StreamExecIntervalJoin.scala | 2 +-
.../stream/StreamExecIntervalJoinRule.scala | 4 +-
.../plan/stream/sql/join/IntervalJoinTest.xml | 56 ++++++++++++++++++++++
.../plan/stream/sql/join/IntervalJoinTest.scala | 22 +++++++++
.../runtime/stream/sql/IntervalJoinITCase.scala | 51 ++++++++++++++++++++
5 files changed, 131 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
index 57183df..7c88c96 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
@@ -69,7 +69,7 @@ class StreamExecIntervalJoin(
with StreamPhysicalRel
with StreamExecNode[RowData] {
- if (containsPythonCall(remainCondition.get)) {
+ if (remainCondition.isDefined && containsPythonCall(remainCondition.get)) {
throw new TableException("Only inner join condition with equality
predicates supports the " +
"Python UDF taking the inputs from the left table and the right table at
the same time, " +
"e.g., ON T1.id = T2.id && pythonUdf(T1.a, T2.b)")
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
index 881c24c..a52291d 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
@@ -47,11 +47,9 @@ class StreamExecIntervalJoinRule
override def matches(call: RelOptRuleCall): Boolean = {
val join: FlinkLogicalJoin = call.rel(0)
val joinRowType = join.getRowType
- val joinInfo = join.analyzeCondition()
- // joins require an equi-condition or a conjunctive predicate with at
least one equi-condition
// TODO support SEMI/ANTI join
- if (!join.getJoinType.projectsRight || joinInfo.pairs().isEmpty) {
+ if (!join.getJoinType.projectsRight) {
return false
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
index 1b963eb..fee20c3 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
@@ -194,6 +194,34 @@ Calc(select=[a, b])
]]>
</Resource>
</TestCase>
+ <TestCase name="testProcessingTimeInnerJoinWithoutEqualCondition">
+ <Resource name="sql">
+ <![CDATA[
+SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+ t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime +
INTERVAL '1' HOUR
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$6])
++- LogicalJoin(condition=[AND(>=($3, -($8, 3600000:INTERVAL HOUR)), <=($3,
+($8, 3600000:INTERVAL HOUR)))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, b])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false,
leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1,
rightTimeIndex=1], where=[AND(>=(proctime, -(proctime0, 3600000:INTERVAL
HOUR)), <=(proctime, +(proctime0, 3600000:INTERVAL HOUR)))], select=[a,
proctime, b, proctime0])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a, proctime])
+ : +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[b, proctime])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testRowTimeInnerJoinAndWindowAggregationOnFirst">
<Resource name="sql">
<![CDATA[
@@ -475,6 +503,34 @@ Calc(select=[a, b])
]]>
</Resource>
</TestCase>
+ <TestCase name="testRowTimeInnerJoinWithoutEqualCondition">
+ <Resource name="sql">
+ <![CDATA[
+SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+ t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime +
INTERVAL '1' HOUR
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$6])
++- LogicalJoin(condition=[AND(>=($4, -($9, 10000:INTERVAL SECOND)), <=($4,
+($9, 3600000:INTERVAL HOUR)))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, b])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1,
rightTimeIndex=1], where=[AND(>=(rowtime, -(rowtime0, 10000:INTERVAL SECOND)),
<=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[a, rowtime, b,
rowtime0])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a, rowtime])
+ : +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[b, rowtime])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testRowTimeLeftOuterJoin">
<Resource name="sql">
<![CDATA[
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
index bca2c37..329ef72 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
@@ -127,6 +127,17 @@ class IntervalJoinTest extends TableTestBase {
}
@Test
+ def testProcessingTimeInnerJoinWithoutEqualCondition(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND
t2.proctime + INTERVAL '1' HOUR
+ """.stripMargin
+
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
def testRowTimeInnerJoinWithOnClause(): Unit = {
val sqlQuery =
"""
@@ -139,6 +150,17 @@ class IntervalJoinTest extends TableTestBase {
}
@Test
+ def testRowTimeInnerJoinWithoutEqualCondition(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+ | t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime
+ INTERVAL '1' HOUR
+ """.stripMargin
+
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
def testRowTimeInnerJoinWithWhereClause(): Unit = {
val sqlQuery =
"""
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
index 249c63d..13ae51f 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
@@ -178,6 +178,57 @@ class IntervalJoinITCase(mode: StateBackendMode) extends
StreamingWithStateTestB
assertEquals(expected, sink.getAppendResults.sorted)
}
+ /** test rowtime inner join without equal condition **/
+ @Test
+ def testRowTimeInnerJoinWithoutEqualCondition(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT t2.key, t2.id, t1.id
+ |FROM T1 as t1 join T2 as t2 ON
+ | t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+ | t2.rowtime + INTERVAL '6' SECOND
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(String, String, Long)]
+ // for boundary test
+ data1.+=(("A", "LEFT0.999", 999L))
+ data1.+=(("A", "LEFT1", 1000L))
+ data1.+=(("A", "LEFT2", 2000L))
+ data1.+=(("A", "LEFT3", 3000L))
+ data1.+=(("B", "LEFT4", 4000L))
+ data1.+=(("A", "LEFT5", 5000L))
+ data1.+=(("A", "LEFT6", 6000L))
+ // test null key
+ data1.+=((null.asInstanceOf[String], "LEFT8", 8000L))
+
+ val data2 = new mutable.MutableList[(String, String, Long)]
+ data2.+=(("A", "RIGHT6", 6000L))
+ data2.+=(("B", "RIGHT7", 7000L))
+ // test null key
+ data2.+=((null.asInstanceOf[String], "RIGHT10", 10000L))
+
+ val t1 = env.fromCollection(data1)
+ .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+ .toTable(tEnv, 'key, 'id, 'rowtime.rowtime)
+ val t2 = env.fromCollection(data2)
+ .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+ .toTable(tEnv, 'key, 'id, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+ val sink = new TestingAppendSink
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(sink)
+ env.execute()
+ val expected = mutable.MutableList(
+ "A,RIGHT6,LEFT1", "A,RIGHT6,LEFT2", "A,RIGHT6,LEFT3", "A,RIGHT6,LEFT4",
+ "A,RIGHT6,LEFT5", "A,RIGHT6,LEFT6", "A,RIGHT6,LEFT8", "B,RIGHT7,LEFT2",
+ "B,RIGHT7,LEFT3", "B,RIGHT7,LEFT4", "B,RIGHT7,LEFT5", "B,RIGHT7,LEFT6",
+ "B,RIGHT7,LEFT8", "null,RIGHT10,LEFT5", "null,RIGHT10,LEFT6",
"null,RIGHT10,LEFT8"
+ )
+ assertEquals(expected, sink.getAppendResults.sorted)
+ }
+
@Test
def testUnboundedAggAfterRowtimeInnerJoin(): Unit = {
val innerSql=