This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new bf254ee  [FLINK-22098][table-planner-blink] Fix bug for window join: 
plan is wrong if join condition contains 'IS NOT DISTINCT FROM'
bf254ee is described below

commit bf254ee40ad3db24544dcbd53af146e50118d751
Author: Jing Zhang <[email protected]>
AuthorDate: Fri Apr 2 11:42:27 2021 +0800

    [FLINK-22098][table-planner-blink] Fix bug for window join: plan is wrong 
if join condition contains 'IS NOT DISTINCT FROM'
    
    This closes #15476
    
    (cherry picked from commit 6604c5c65e74224970be1a4c89e2ca20b1c92a0e)
---
 .../stream/StreamPhysicalWindowJoinRule.scala      | 18 +++---
 .../table/planner/plan/utils/WindowJoinUtil.scala  | 31 ++++-----
 .../plan/stream/sql/join/WindowJoinTest.xml        | 74 ++++++++++++++++++++++
 .../plan/stream/sql/join/WindowJoinTest.scala      | 37 +++++++++++
 4 files changed, 130 insertions(+), 30 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
index ad84dd7..5425e21 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
@@ -29,8 +29,6 @@ import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 
-import java.util
-
 /**
  * Rule to convert a [[FlinkLogicalJoin]] into a [[StreamPhysicalWindowJoin]].
  */
@@ -49,19 +47,19 @@ class StreamPhysicalWindowJoinRule
   override def onMatch(call: RelOptRuleCall): Unit = {
 
     def toHashTraitByColumns(
-        columns: util.Collection[_ <: Number],
+        columns: Array[Int],
         inputTraitSet: RelTraitSet): RelTraitSet = {
-      val distribution = if (columns.size() == 0) {
+      val distribution = if (columns.isEmpty) {
         FlinkRelDistribution.SINGLETON
       } else {
-        FlinkRelDistribution.hash(columns)
+        FlinkRelDistribution.hash(columns, true)
       }
       inputTraitSet
         .replace(FlinkConventions.STREAM_PHYSICAL)
         .replace(distribution)
     }
 
-    def convertInput(input: RelNode, columns: util.Collection[_ <: Number]): 
RelNode = {
+    def convertInput(input: RelNode, columns: Array[Int]): RelNode = {
       val requiredTraitSet = toHashTraitByColumns(columns, input.getTraitSet)
       RelOptRule.convert(input, requiredTraitSet)
     }
@@ -89,13 +87,13 @@ class StreamPhysicalWindowJoinRule
     val leftWindowing = new WindowAttachedWindowingStrategy(
       leftWindowProperties.getWindowSpec,
       leftWindowProperties.getTimeAttributeType,
-      windowStartEqualityLeftKeys.getInt(0),
-      windowEndEqualityLeftKeys.getInt(0))
+      windowStartEqualityLeftKeys(0),
+      windowEndEqualityLeftKeys(0))
     val rightWindowing = new WindowAttachedWindowingStrategy(
       rightWindowProperties.getWindowSpec,
       rightWindowProperties.getTimeAttributeType,
-      windowStartEqualityRightKeys.getInt(0),
-      windowEndEqualityRightKeys.getInt(0))
+      windowStartEqualityRightKeys(0),
+      windowEndEqualityRightKeys(0))
 
     val newWindowJoin = new StreamPhysicalWindowJoin(
       join.getCluster,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
index 3f68274..b9453f5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
@@ -20,13 +20,12 @@ package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.ExpressionFormat
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
 import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
 
 import org.apache.calcite.rex.{RexInputRef, RexNode, RexUtil}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.util.ImmutableIntList
-import org.apache.flink.table.planner.plan.nodes.ExpressionFormat
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -79,13 +78,7 @@ object WindowJoinUtil {
    */
   def excludeWindowStartEqualityAndEndEqualityFromJoinCondition(
       join: FlinkLogicalJoin): (
-    ImmutableIntList,
-    ImmutableIntList,
-    ImmutableIntList,
-    ImmutableIntList,
-    ImmutableIntList,
-    ImmutableIntList,
-    RexNode) = {
+    Array[Int], Array[Int], Array[Int], Array[Int], Array[Int], Array[Int], 
RexNode) = {
     val (
       windowStartEqualityLeftKeys,
       windowEndEqualityLeftKeys,
@@ -104,6 +97,7 @@ object WindowJoinUtil {
       val remainLeftKeysArray = mutable.ArrayBuffer[Int]()
       val remainRightKeysArray = mutable.ArrayBuffer[Int]()
       // convert remain pairs to RexInputRef tuple for building 
SqlStdOperatorTable.EQUALS calls
+      // or SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
       joinInfo.pairs().foreach { p =>
         if (!windowStartEqualityLeftKeys.contains(p.source) &&
           !windowEndEqualityLeftKeys.contains(p.source)) {
@@ -123,12 +117,12 @@ object WindowJoinUtil {
       }
       val remainAnds = remainEquals ++ joinInfo.nonEquiConditions
       (
-        toImmutableIntList(remainLeftKeysArray),
-        toImmutableIntList(remainRightKeysArray),
+        remainLeftKeysArray.toArray,
+        remainRightKeysArray.toArray,
         // build a new condition
         RexUtil.composeConjunction(rexBuilder, remainAnds.toList))
     } else {
-      (joinInfo.leftKeys, joinInfo.rightKeys, join.getCondition)
+      (joinInfo.leftKeys.toIntArray, joinInfo.rightKeys.toIntArray, 
join.getCondition)
     }
 
     (
@@ -153,8 +147,7 @@ object WindowJoinUtil {
    *         the forth element is right join keys of window ends equality.
    */
   private def excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(
-      join: FlinkLogicalJoin): (
-    ImmutableIntList, ImmutableIntList, ImmutableIntList, ImmutableIntList) = {
+      join: FlinkLogicalJoin): (Array[Int], Array[Int], Array[Int], 
Array[Int]) = {
     val joinInfo = join.analyzeCondition()
     val (leftWindowProperties, rightWindowProperties) = 
getChildWindowProperties(join)
 
@@ -216,13 +209,11 @@ object WindowJoinUtil {
     }
 
     (
-      toImmutableIntList(windowStartEqualityLeftKeys),
-      toImmutableIntList(windowEndEqualityLeftKeys),
-      toImmutableIntList(windowStartEqualityRightKeys),
-      toImmutableIntList(windowEndEqualityRightKeys)
+      windowStartEqualityLeftKeys.toArray,
+      windowEndEqualityLeftKeys.toArray,
+      windowStartEqualityRightKeys.toArray,
+      windowEndEqualityRightKeys.toArray
     )
   }
 
-  private def toImmutableIntList(seq: Seq[Int]): ImmutableIntList = 
ImmutableIntList.of(seq: _*)
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index a89d408..0210a61 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -485,6 +485,80 @@ Calc(select=[a, b, c, a0, b0, c0])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinWithIsNotDistinctFrom">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND
+L.a IS NOT DISTINCT FROM R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), IS NOT DISTINCT FROM($0, 
$6))], joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], 
size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+:     +- GlobalWindowAggregate(groupBy=[a], 
window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, 
COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+:        +- Exchange(distribution=[hash[a]])
+:           +- LocalWindowAggregate(groupBy=[a], 
window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS 
count1$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0, 
slice_end('w$) AS $slice_end])
+:              +- Calc(select=[a, c, rowtime])
+:                 +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+:                    +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+:                       +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+      +- GlobalWindowAggregate(groupBy=[a], 
window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, 
COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+         +- Exchange(distribution=[hash[a]])
+            +- LocalWindowAggregate(groupBy=[a], 
window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS 
count1$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0, 
slice_end('w$) AS $slice_end])
+               +- Calc(select=[a, c, rowtime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSemiJoinIN">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
index 04be3fa..b79bb9a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
@@ -1104,4 +1104,41 @@ class WindowJoinTest extends TableTestBase {
       """.stripMargin
     util.verifyRelPlan(sql)
   }
+
+  // 
----------------------------------------------------------------------------------------
+  // Test IS NOT DISTINCT FROM
+  // 
----------------------------------------------------------------------------------------
+
+  @Test
+  def testJoinWithIsNotDistinctFrom(): Unit = {
+    val sql =
+      """
+        |SELECT L.*, R.*
+        |FROM (
+        |  SELECT
+        |    a,
+        |    window_start,
+        |    window_end,
+        |    window_time,
+        |    count(*) as cnt,
+        |    count(distinct c) AS uv
+        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
+        |  GROUP BY a, window_start, window_end, window_time
+        |) L
+        |JOIN (
+        |  SELECT
+        |    a,
+        |    window_start,
+        |    window_end,
+        |    window_time,
+        |    count(*) as cnt,
+        |    count(distinct c) AS uv
+        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL 
'15' MINUTE))
+        |  GROUP BY a, window_start, window_end, window_time
+        |) R
+        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND
+        |L.a IS NOT DISTINCT FROM R.a
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
 }

Reply via email to