This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new bf254ee [FLINK-22098][table-planner-blink] Fix bug for window join:
plan is wrong if join condition contains 'IS NOT DISTINCT FROM'
bf254ee is described below
commit bf254ee40ad3db24544dcbd53af146e50118d751
Author: Jing Zhang <[email protected]>
AuthorDate: Fri Apr 2 11:42:27 2021 +0800
[FLINK-22098][table-planner-blink] Fix bug for window join: plan is wrong
if join condition contains 'IS NOT DISTINCT FROM'
This closes #15476
(cherry picked from commit 6604c5c65e74224970be1a4c89e2ca20b1c92a0e)
---
.../stream/StreamPhysicalWindowJoinRule.scala | 18 +++---
.../table/planner/plan/utils/WindowJoinUtil.scala | 31 ++++-----
.../plan/stream/sql/join/WindowJoinTest.xml | 74 ++++++++++++++++++++++
.../plan/stream/sql/join/WindowJoinTest.scala | 37 +++++++++++
4 files changed, 130 insertions(+), 30 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
index ad84dd7..5425e21 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowJoinRule.scala
@@ -29,8 +29,6 @@ import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
-import java.util
-
/**
* Rule to convert a [[FlinkLogicalJoin]] into a [[StreamPhysicalWindowJoin]].
*/
@@ -49,19 +47,19 @@ class StreamPhysicalWindowJoinRule
override def onMatch(call: RelOptRuleCall): Unit = {
def toHashTraitByColumns(
- columns: util.Collection[_ <: Number],
+ columns: Array[Int],
inputTraitSet: RelTraitSet): RelTraitSet = {
- val distribution = if (columns.size() == 0) {
+ val distribution = if (columns.isEmpty) {
FlinkRelDistribution.SINGLETON
} else {
- FlinkRelDistribution.hash(columns)
+ FlinkRelDistribution.hash(columns, true)
}
inputTraitSet
.replace(FlinkConventions.STREAM_PHYSICAL)
.replace(distribution)
}
- def convertInput(input: RelNode, columns: util.Collection[_ <: Number]):
RelNode = {
+ def convertInput(input: RelNode, columns: Array[Int]): RelNode = {
val requiredTraitSet = toHashTraitByColumns(columns, input.getTraitSet)
RelOptRule.convert(input, requiredTraitSet)
}
@@ -89,13 +87,13 @@ class StreamPhysicalWindowJoinRule
val leftWindowing = new WindowAttachedWindowingStrategy(
leftWindowProperties.getWindowSpec,
leftWindowProperties.getTimeAttributeType,
- windowStartEqualityLeftKeys.getInt(0),
- windowEndEqualityLeftKeys.getInt(0))
+ windowStartEqualityLeftKeys(0),
+ windowEndEqualityLeftKeys(0))
val rightWindowing = new WindowAttachedWindowingStrategy(
rightWindowProperties.getWindowSpec,
rightWindowProperties.getTimeAttributeType,
- windowStartEqualityRightKeys.getInt(0),
- windowEndEqualityRightKeys.getInt(0))
+ windowStartEqualityRightKeys(0),
+ windowEndEqualityRightKeys(0))
val newWindowJoin = new StreamPhysicalWindowJoin(
join.getCluster,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
index 3f68274..b9453f5 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
@@ -20,13 +20,12 @@ package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.ExpressionFormat
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
import org.apache.calcite.rex.{RexInputRef, RexNode, RexUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.util.ImmutableIntList
-import org.apache.flink.table.planner.plan.nodes.ExpressionFormat
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -79,13 +78,7 @@ object WindowJoinUtil {
*/
def excludeWindowStartEqualityAndEndEqualityFromJoinCondition(
join: FlinkLogicalJoin): (
- ImmutableIntList,
- ImmutableIntList,
- ImmutableIntList,
- ImmutableIntList,
- ImmutableIntList,
- ImmutableIntList,
- RexNode) = {
+ Array[Int], Array[Int], Array[Int], Array[Int], Array[Int], Array[Int],
RexNode) = {
val (
windowStartEqualityLeftKeys,
windowEndEqualityLeftKeys,
@@ -104,6 +97,7 @@ object WindowJoinUtil {
val remainLeftKeysArray = mutable.ArrayBuffer[Int]()
val remainRightKeysArray = mutable.ArrayBuffer[Int]()
// convert remain pairs to RexInputRef tuple for building
SqlStdOperatorTable.EQUALS calls
+ // or SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
joinInfo.pairs().foreach { p =>
if (!windowStartEqualityLeftKeys.contains(p.source) &&
!windowEndEqualityLeftKeys.contains(p.source)) {
@@ -123,12 +117,12 @@ object WindowJoinUtil {
}
val remainAnds = remainEquals ++ joinInfo.nonEquiConditions
(
- toImmutableIntList(remainLeftKeysArray),
- toImmutableIntList(remainRightKeysArray),
+ remainLeftKeysArray.toArray,
+ remainRightKeysArray.toArray,
// build a new condition
RexUtil.composeConjunction(rexBuilder, remainAnds.toList))
} else {
- (joinInfo.leftKeys, joinInfo.rightKeys, join.getCondition)
+ (joinInfo.leftKeys.toIntArray, joinInfo.rightKeys.toIntArray,
join.getCondition)
}
(
@@ -153,8 +147,7 @@ object WindowJoinUtil {
* the forth element is right join keys of window ends equality.
*/
private def excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(
- join: FlinkLogicalJoin): (
- ImmutableIntList, ImmutableIntList, ImmutableIntList, ImmutableIntList) = {
+ join: FlinkLogicalJoin): (Array[Int], Array[Int], Array[Int],
Array[Int]) = {
val joinInfo = join.analyzeCondition()
val (leftWindowProperties, rightWindowProperties) =
getChildWindowProperties(join)
@@ -216,13 +209,11 @@ object WindowJoinUtil {
}
(
- toImmutableIntList(windowStartEqualityLeftKeys),
- toImmutableIntList(windowEndEqualityLeftKeys),
- toImmutableIntList(windowStartEqualityRightKeys),
- toImmutableIntList(windowEndEqualityRightKeys)
+ windowStartEqualityLeftKeys.toArray,
+ windowEndEqualityLeftKeys.toArray,
+ windowStartEqualityRightKeys.toArray,
+ windowEndEqualityRightKeys.toArray
)
}
- private def toImmutableIntList(seq: Seq[Int]): ImmutableIntList =
ImmutableIntList.of(seq: _*)
-
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index a89d408..0210a61 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -485,6 +485,80 @@ Calc(select=[a, b, c, a0, b0, c0])
]]>
</Resource>
</TestCase>
+ <TestCase name="testJoinWithIsNotDistinctFrom">
+ <Resource name="sql">
+ <![CDATA[
+SELECT L.*, R.*
+FROM (
+ SELECT
+ a,
+ window_start,
+ window_end,
+ window_time,
+ count(*) as cnt,
+ count(distinct c) AS uv
+ FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+ GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+ SELECT
+ a,
+ window_start,
+ window_end,
+ window_time,
+ count(*) as cnt,
+ count(distinct c) AS uv
+ FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+ GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND
+L.a IS NOT DISTINCT FROM R.a
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3],
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8],
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), IS NOT DISTINCT FROM($0,
$6))], joinType=[inner])
+ :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT
$4)])
+ : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6],
window_time=[$7], c=[$2])
+ : +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b,
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME*
window_time)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[$4])
+ : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+ +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT
$4)])
+ +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6],
window_time=[$7], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b,
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME*
window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[$4])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end],
size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start],
win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)],
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0,
window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+: +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+: +- GlobalWindowAggregate(groupBy=[a],
window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a,
COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+: +- Exchange(distribution=[hash[a]])
+: +- LocalWindowAggregate(groupBy=[a],
window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS
count1$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0,
slice_end('w$) AS $slice_end])
+: +- Calc(select=[a, c, rowtime])
+: +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+: +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+: +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+ +- GlobalWindowAggregate(groupBy=[a],
window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a,
COUNT(count1$0) AS cnt, COUNT(distinct$0 count$1) AS uv, start('w$) AS
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalWindowAggregate(groupBy=[a],
window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS
count1$0, COUNT(distinct$0 c) AS count$1, DISTINCT(c) AS distinct$0,
slice_end('w$) AS $slice_end])
+ +- Calc(select=[a, c, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSemiJoinIN">
<Resource name="sql">
<![CDATA[
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
index 04be3fa..b79bb9a 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
@@ -1104,4 +1104,41 @@ class WindowJoinTest extends TableTestBase {
""".stripMargin
util.verifyRelPlan(sql)
}
+
+ //
----------------------------------------------------------------------------------------
+ // Test IS NOT DISTINCT FROM
+ //
----------------------------------------------------------------------------------------
+
+ @Test
+ def testJoinWithIsNotDistinctFrom(): Unit = {
+ val sql =
+ """
+ |SELECT L.*, R.*
+ |FROM (
+ | SELECT
+ | a,
+ | window_start,
+ | window_end,
+ | window_time,
+ | count(*) as cnt,
+ | count(distinct c) AS uv
+ | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
+ | GROUP BY a, window_start, window_end, window_time
+ |) L
+ |JOIN (
+ | SELECT
+ | a,
+ | window_start,
+ | window_end,
+ | window_time,
+ | count(*) as cnt,
+ | count(distinct c) AS uv
+ | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL
'15' MINUTE))
+ | GROUP BY a, window_start, window_end, window_time
+ |) R
+ |ON L.window_start = R.window_start AND L.window_end = R.window_end AND
+ |L.a IS NOT DISTINCT FROM R.a
+ """.stripMargin
+ util.verifyRelPlan(sql)
+ }
}