This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new ae7a730 [FLINK-23827][table-planner] Fix ModifiedMonotonicity
inference for some nodes
ae7a730 is described below
commit ae7a7303f95669d7138221979efae5ff846dc92d
Author: shuo.cs <[email protected]>
AuthorDate: Tue Aug 17 12:36:29 2021 +0800
[FLINK-23827][table-planner] Fix ModifiedMonotonicity inference for some
nodes
This closes #16853
(cherry picked from commit b8c6edb3b68ae920e785ae50de7745d5a37ebe3f)
---
.../metadata/FlinkRelMdModifiedMonotonicity.scala | 25 +++--
.../table/planner/plan/stream/sql/RankTest.xml | 114 +++++++++++++++++++++
.../FlinkRelMdModifiedMonotonicityTest.scala | 10 +-
.../table/planner/plan/stream/sql/RankTest.scala | 105 +++++++++++++++++++
4 files changed, 247 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 675009d..ad5e4e0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -27,7 +27,7 @@ import
org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, TableAgg
import org.apache.flink.table.planner.plan.nodes.logical._
import
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate,
BatchPhysicalGroupAggregateBase}
import org.apache.flink.table.planner.plan.nodes.physical.stream._
-import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase,
TableSourceTable}
+import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase,
IntermediateRelTable, TableSourceTable}
import org.apache.flink.table.planner.plan.stats.{WithLower, WithUpper}
import org.apache.flink.table.planner.{JByte, JDouble, JFloat, JList, JLong,
JShort}
import org.apache.flink.types.RowKind
@@ -47,7 +47,6 @@ import org.apache.calcite.util.Util
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date, Time, Timestamp}
import java.util.Collections
-
import scala.collection.JavaConversions._
/**
@@ -71,6 +70,9 @@ class FlinkRelMdModifiedMonotonicity private extends
MetadataHandler[ModifiedMon
new
RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
case _ => null
}
+ case _: FlinkLogicalIntermediateTableScan | _:
StreamPhysicalIntermediateTableScan =>
+ val table = rel.getTable.unwrap(classOf[IntermediateRelTable])
+ table.getStatistic.getRelModifiedMonotonicity
case _ => null
}
@@ -209,10 +211,15 @@ class FlinkRelMdModifiedMonotonicity private extends
MetadataHandler[ModifiedMon
rel: StreamPhysicalDeduplicate,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
if (allAppend(mq, rel.getInput)) {
- val mono = new RelModifiedMonotonicity(
- Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
- rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
- mono
+ if (rel.keepLastRow || rel.isRowtime) {
+ val mono = new RelModifiedMonotonicity(
+ Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
+ rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
+ mono
+ } else {
+ // FirstRow do not generate updates.
+ new
RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(CONSTANT))
+ }
} else {
null
}
@@ -250,6 +257,12 @@ class FlinkRelMdModifiedMonotonicity private extends
MetadataHandler[ModifiedMon
fmq.getRelModifiedMonotonicity(rel.getInput)
}
+ def getRelModifiedMonotonicity(
+ rel: StreamPhysicalLookupJoin,
+ mq: RelMetadataQuery): RelModifiedMonotonicity = {
+ getMonotonicity(rel.getInput, mq, rel.getRowType.getFieldCount)
+ }
+
def getRelModifiedMonotonicity(rel: Aggregate, mq: RelMetadataQuery):
RelModifiedMonotonicity = {
getRelModifiedMonotonicityOnAggregate(rel.getInput, mq,
rel.getAggCallList.toList,
rel.getGroupSet.toArray)
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index ddf67dc..5b0f3d9 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -630,6 +630,48 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1,
]]>
</Resource>
</TestCase>
+ <TestCase name="testUpdatableRankAfterLookupJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM (
+ SELECT name, ids,
+ ROW_NUMBER() OVER (PARTITION BY name ORDER BY ids DESC) as rank_num
+ FROM (
+ SELECT name, SUM(id) FILTER (WHERE id > 0) as ids
+ FROM V1
+ GROUP BY name
+ ))
+WHERE rank_num <= 3
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(name=[$0], ids=[$1], rank_num=[$2])
++- LogicalFilter(condition=[<=($2, 3)])
+ +- LogicalProject(name=[$0], ids=[$1], rank_num=[ROW_NUMBER() OVER
(PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
+ +- LogicalAggregate(group=[{0}], ids=[SUM($1) FILTER $2])
+ +- LogicalProject(name=[$6], id=[$5], $f2=[IS TRUE(>($5, 0))])
+ +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+ +- LogicalFilter(condition=[=($cor1.a, $0)])
+ +- LogicalSnapshot(period=[$cor1.proctime])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=3], partitionBy=[name], orderBy=[ids DESC],
select=[name, ids, w0$o0])
++- Exchange(distribution=[hash[name]])
+ +- GroupAggregate(groupBy=[name], select=[name, SUM(id) FILTER $f2 AS ids])
+ +- Exchange(distribution=[hash[name]])
+ +- Calc(select=[name, id, IS TRUE(>(id, 0)) AS $f2])
+ +-
LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, id, name])
+ +- Calc(select=[a])
+ +- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTopNOrderByCount">
<Resource name="sql">
<![CDATA[
@@ -1023,4 +1065,76 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1,
]]>
</Resource>
</TestCase>
+ <TestCase name="testUpdatableRankAfterIntermediateScan">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[<($3, 3)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER
(PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
+ +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, MIN(c) AS c])(reuse_id=[1])
++- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[b DESC],
select=[a, b, c])
+ +- Exchange(distribution=[hash[a]])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUpdatableRankWithDeduplicate">
+ <Resource name="sql">
+ <![CDATA[
+SELECT c, b, d
+FROM (
+ SELECT
+ c, b, d,
+ ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn FROM v1
+) WHERE rn < 10
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$0], b=[$1], d=[$2])
++- LogicalFilter(condition=[<($3, 10)])
+ +- LogicalProject(c=[$0], b=[$1], d=[$2], rn=[ROW_NUMBER() OVER (PARTITION
BY $0, $1 ORDER BY $2 DESC NULLS LAST)])
+ +- LogicalAggregate(group=[{0, 1}], d=[SUM($2) FILTER $3])
+ +- LogicalProject(c=[$2], b=[$1], a=[$0], $f3=[IS TRUE(>($0, 0))])
+ +- LogicalFilter(condition=[=($5, 1)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
rowtime=[$4], rowNum=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY PROCTIME()
NULLS FIRST)])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC],
select=[c, b, d])
++- Exchange(distribution=[hash[c, b]])
+ +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM(a) FILTER $f3 AS d])
+ +- Exchange(distribution=[hash[c, b]])
+ +- Calc(select=[c, b, a, IS TRUE(>(a, 0)) AS $f3])
+ +- Deduplicate(keep=[FirstRow], key=[c], order=[PROCTIME])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, b, c, PROCTIME() AS $5])
+ +- DataStreamScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index f2a3235..b6aa05a 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -318,7 +318,7 @@ class FlinkRelMdModifiedMonotonicityTest extends
FlinkRelMdHandlerTestBase {
@Test
def testGetRelMonotonicityOnDeduplicate(): Unit = {
assertEquals(
- new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT,
NOT_MONOTONIC)),
+ new RelModifiedMonotonicity(Array(CONSTANT, CONSTANT, CONSTANT)),
mq.getRelModifiedMonotonicity(streamProcTimeDeduplicateFirstRow))
assertEquals(
@@ -352,5 +352,13 @@ class FlinkRelMdModifiedMonotonicityTest extends
FlinkRelMdHandlerTestBase {
mq.getRelModifiedMonotonicity(streamDropUpdateBefore))
}
+ @Test
+ def testGetRelMonotonicityOnLookupJoin(): Unit = {
+ assertEquals(
+ new RelModifiedMonotonicity(Array(
+ CONSTANT,CONSTANT,CONSTANT,CONSTANT,CONSTANT,
+ CONSTANT,CONSTANT,CONSTANT,CONSTANT,CONSTANT)),
+ mq.getRelModifiedMonotonicity(streamLookupJoin))
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 4871f6c..3527d0b 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -19,7 +19,10 @@ package org.apache.flink.table.planner.plan.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
+import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.junit.Test
@@ -776,5 +779,107 @@ class RankTest extends TableTestBase {
util.verifyExecPlan(sql)
}
+ @Test
+ def testUpdatableRankWithDeduplicate(): Unit = {
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW v0 AS
+ |SELECT *
+ |FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `c`
+ | ORDER BY `PROCTIME`()) AS `rowNum`
+ | FROM MyTable)
+ |WHERE `rowNum` = 1
+ |""".stripMargin)
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW v1 AS
+ |SELECT c, b, SUM(a) FILTER (WHERE a > 0) AS d FROM v0 GROUP BY c, b
+ |""".stripMargin)
+ util.verifyRelPlan(
+ """
+ |SELECT c, b, d
+ |FROM (
+ | SELECT
+ | c, b, d,
+ | ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn
FROM v1
+ |) WHERE rn < 10
+ |""".stripMargin)
+ }
+ @Test
+ def testUpdatableRankAfterLookupJoin(): Unit = {
+ util.addTable(
+ s"""
+ |CREATE TABLE LookupTable (
+ | `id` INT,
+ | `name` STRING,
+ | `age` INT
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW V1 AS
+ |SELECT *
+ |FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime
AS D
+ |ON T.a = D.id
+ |""".stripMargin)
+ val sql =
+ s"""
+ |SELECT *
+ |FROM (
+ | SELECT name, ids,
+ | ROW_NUMBER() OVER (PARTITION BY name ORDER BY ids DESC) as
rank_num
+ | FROM (
+ | SELECT name, SUM(id) FILTER (WHERE id > 0) as ids
+ | FROM V1
+ | GROUP BY name
+ | ))
+ |WHERE rank_num <= 3
+ |""".stripMargin
+ util.verifyRelPlan(sql)
+ }
+
+ @Test
+ def testUpdatableRankAfterIntermediateScan(): Unit = {
+ util.tableEnv.getConfig.getConfiguration.setBoolean(
+
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
true)
+ util.tableEnv.executeSql(
+ """
+ |CREATE VIEW v1 AS
+ |SELECT a, MAX(b) AS b, MIN(c) AS c
+ |FROM MyTable GROUP BY a
+ |""".stripMargin)
+
+ util.addTable(
+ s"""
+ |CREATE TABLE sink(
+ | `id` INT,
+ | `name` STRING,
+ | `age` BIGINT,
+ | primary key (id) not enforced
+ |) WITH (
+ | 'connector' = 'values',
+ | 'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ val stmtSet = util.tableEnv.createStatementSet()
+ stmtSet.addInsertSql(
+ """
+ |INSERT INTO sink
+ |SELECT * FROM v1
+ |""".stripMargin)
+ stmtSet.addInsertSql(
+ """
+ |INSERT INTO sink
+ |SELECT a, b, c FROM (
+ | SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn
+ | FROM v1
+ |) WHERE rn < 3
+ |""".stripMargin)
+ util.verifyExecPlan(stmtSet)
+ }
+
// TODO add tests about multi-sinks and udf
}