This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new ae7a730  [FLINK-23827][table-planner] Fix ModifiedMonotonicity 
inference for some nodes
ae7a730 is described below

commit ae7a7303f95669d7138221979efae5ff846dc92d
Author: shuo.cs <[email protected]>
AuthorDate: Tue Aug 17 12:36:29 2021 +0800

    [FLINK-23827][table-planner] Fix ModifiedMonotonicity inference for some 
nodes
    
    This closes #16853
    
    (cherry picked from commit b8c6edb3b68ae920e785ae50de7745d5a37ebe3f)
---
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  25 +++--
 .../table/planner/plan/stream/sql/RankTest.xml     | 114 +++++++++++++++++++++
 .../FlinkRelMdModifiedMonotonicityTest.scala       |  10 +-
 .../table/planner/plan/stream/sql/RankTest.scala   | 105 +++++++++++++++++++
 4 files changed, 247 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 675009d..ad5e4e0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, TableAgg
 import org.apache.flink.table.planner.plan.nodes.logical._
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate,
 BatchPhysicalGroupAggregateBase}
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
-import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
IntermediateRelTable, TableSourceTable}
 import org.apache.flink.table.planner.plan.stats.{WithLower, WithUpper}
 import org.apache.flink.table.planner.{JByte, JDouble, JFloat, JList, JLong, 
JShort}
 import org.apache.flink.types.RowKind
@@ -47,7 +47,6 @@ import org.apache.calcite.util.Util
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 import java.util.Collections
-
 import scala.collection.JavaConversions._
 
 /**
@@ -71,6 +70,9 @@ class FlinkRelMdModifiedMonotonicity private extends 
MetadataHandler[ModifiedMon
             new 
RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
           case _ => null
         }
+      case _: FlinkLogicalIntermediateTableScan | _: 
StreamPhysicalIntermediateTableScan =>
+        val table = rel.getTable.unwrap(classOf[IntermediateRelTable])
+        table.getStatistic.getRelModifiedMonotonicity
       case _ => null
     }
 
@@ -209,10 +211,15 @@ class FlinkRelMdModifiedMonotonicity private extends 
MetadataHandler[ModifiedMon
       rel: StreamPhysicalDeduplicate,
       mq: RelMetadataQuery): RelModifiedMonotonicity = {
     if (allAppend(mq, rel.getInput)) {
-      val mono = new RelModifiedMonotonicity(
-        Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
-      rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
-      mono
+      if (rel.keepLastRow || rel.isRowtime) {
+        val mono = new RelModifiedMonotonicity(
+          Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
+        rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
+        mono
+      } else {
+        // FirstRow do not generate updates.
+        new 
RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(CONSTANT))
+      }
     } else {
       null
     }
@@ -250,6 +257,12 @@ class FlinkRelMdModifiedMonotonicity private extends 
MetadataHandler[ModifiedMon
     fmq.getRelModifiedMonotonicity(rel.getInput)
   }
 
+  def getRelModifiedMonotonicity(
+      rel: StreamPhysicalLookupJoin,
+      mq: RelMetadataQuery): RelModifiedMonotonicity = {
+    getMonotonicity(rel.getInput, mq, rel.getRowType.getFieldCount)
+  }
+
   def getRelModifiedMonotonicity(rel: Aggregate, mq: RelMetadataQuery): 
RelModifiedMonotonicity = {
     getRelModifiedMonotonicityOnAggregate(rel.getInput, mq, 
rel.getAggCallList.toList,
       rel.getGroupSet.toArray)
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index ddf67dc..5b0f3d9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -630,6 +630,48 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testUpdatableRankAfterLookupJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM (
+  SELECT name, ids,
+      ROW_NUMBER() OVER (PARTITION BY name ORDER BY ids DESC) as rank_num
+  FROM (
+     SELECT name, SUM(id) FILTER (WHERE id > 0) as ids
+     FROM V1
+     GROUP BY name
+  ))
+WHERE rank_num <= 3
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], ids=[$1], rank_num=[$2])
++- LogicalFilter(condition=[<=($2, 3)])
+   +- LogicalProject(name=[$0], ids=[$1], rank_num=[ROW_NUMBER() OVER 
(PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
+      +- LogicalAggregate(group=[{0}], ids=[SUM($1) FILTER $2])
+         +- LogicalProject(name=[$6], id=[$5], $f2=[IS TRUE(>($5, 0))])
+            +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 3}])
+               :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+               +- LogicalFilter(condition=[=($cor1.a, $0)])
+                  +- LogicalSnapshot(period=[$cor1.proctime])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=3], partitionBy=[name], orderBy=[ids DESC], 
select=[name, ids, w0$o0])
++- Exchange(distribution=[hash[name]])
+   +- GroupAggregate(groupBy=[name], select=[name, SUM(id) FILTER $f2 AS ids])
+      +- Exchange(distribution=[hash[name]])
+         +- Calc(select=[name, id, IS TRUE(>(id, 0)) AS $f2])
+            +- 
LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, id, name])
+               +- Calc(select=[a])
+                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTopNOrderByCount">
     <Resource name="sql">
       <![CDATA[
@@ -1023,4 +1065,76 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testUpdatableRankAfterIntermediateScan">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[<($3, 3)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER 
(PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
+         +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, MIN(c) AS c])(reuse_id=[1])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, b, c])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[b DESC], 
select=[a, b, c])
+   +- Exchange(distribution=[hash[a]])
+      +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUpdatableRankWithDeduplicate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT c, b, d
+FROM (
+    SELECT
+       c, b, d,
+       ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn FROM v1
+) WHERE rn < 10
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$0], b=[$1], d=[$2])
++- LogicalFilter(condition=[<($3, 10)])
+   +- LogicalProject(c=[$0], b=[$1], d=[$2], rn=[ROW_NUMBER() OVER (PARTITION 
BY $0, $1 ORDER BY $2 DESC NULLS LAST)])
+      +- LogicalAggregate(group=[{0, 1}], d=[SUM($2) FILTER $3])
+         +- LogicalProject(c=[$2], b=[$1], a=[$0], $f3=[IS TRUE(>($0, 0))])
+            +- LogicalFilter(condition=[=($5, 1)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY PROCTIME() 
NULLS FIRST)])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], 
select=[c, b, d])
++- Exchange(distribution=[hash[c, b]])
+   +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM(a) FILTER $f3 AS d])
+      +- Exchange(distribution=[hash[c, b]])
+         +- Calc(select=[c, b, a, IS TRUE(>(a, 0)) AS $f3])
+            +- Deduplicate(keep=[FirstRow], key=[c], order=[PROCTIME])
+               +- Exchange(distribution=[hash[c]])
+                  +- Calc(select=[a, b, c, PROCTIME() AS $5])
+                     +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index f2a3235..b6aa05a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -318,7 +318,7 @@ class FlinkRelMdModifiedMonotonicityTest extends 
FlinkRelMdHandlerTestBase {
   @Test
   def testGetRelMonotonicityOnDeduplicate(): Unit = {
     assertEquals(
-      new RelModifiedMonotonicity(Array(NOT_MONOTONIC, CONSTANT, 
NOT_MONOTONIC)),
+      new RelModifiedMonotonicity(Array(CONSTANT, CONSTANT, CONSTANT)),
       mq.getRelModifiedMonotonicity(streamProcTimeDeduplicateFirstRow))
 
     assertEquals(
@@ -352,5 +352,13 @@ class FlinkRelMdModifiedMonotonicityTest extends 
FlinkRelMdHandlerTestBase {
       mq.getRelModifiedMonotonicity(streamDropUpdateBefore))
   }
 
+  @Test
+  def testGetRelMonotonicityOnLookupJoin(): Unit = {
+    assertEquals(
+      new RelModifiedMonotonicity(Array(
+        CONSTANT,CONSTANT,CONSTANT,CONSTANT,CONSTANT,
+        CONSTANT,CONSTANT,CONSTANT,CONSTANT,CONSTANT)),
+      mq.getRelModifiedMonotonicity(streamLookupJoin))
+  }
 }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 4871f6c..3527d0b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -19,7 +19,10 @@ package org.apache.flink.table.planner.plan.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 
 import org.junit.Test
 
@@ -776,5 +779,107 @@ class RankTest extends TableTestBase {
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testUpdatableRankWithDeduplicate(): Unit = {
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW v0 AS
+        |SELECT *
+        |FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `c`
+        |        ORDER BY `PROCTIME`()) AS `rowNum`
+        |        FROM MyTable)
+        |WHERE `rowNum` = 1
+        |""".stripMargin)
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW v1 AS
+        |SELECT c, b, SUM(a) FILTER (WHERE a > 0) AS d FROM v0 GROUP BY c, b
+        |""".stripMargin)
+    util.verifyRelPlan(
+      """
+        |SELECT c, b, d
+        |FROM (
+        |    SELECT
+        |       c, b, d,
+        |       ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn 
FROM v1
+        |) WHERE rn < 10
+        |""".stripMargin)
+  }
+  @Test
+  def testUpdatableRankAfterLookupJoin(): Unit = {
+    util.addTable(
+      s"""
+         |CREATE TABLE LookupTable (
+         |  `id` INT,
+         |  `name` STRING,
+         |  `age` INT
+         |) WITH (
+         |  'connector' = 'values'
+         |)
+         |""".stripMargin)
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW V1 AS
+        |SELECT *
+        |FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime 
AS D
+        |ON T.a = D.id
+        |""".stripMargin)
+    val sql =
+      s"""
+         |SELECT *
+         |FROM (
+         |  SELECT name, ids,
+         |      ROW_NUMBER() OVER (PARTITION BY name ORDER BY ids DESC) as 
rank_num
+         |  FROM (
+         |     SELECT name, SUM(id) FILTER (WHERE id > 0) as ids
+         |     FROM V1
+         |     GROUP BY name
+         |  ))
+         |WHERE rank_num <= 3
+         |""".stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testUpdatableRankAfterIntermediateScan(): Unit = {
+    util.tableEnv.getConfig.getConfiguration.setBoolean(
+      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
 true)
+    util.tableEnv.executeSql(
+      """
+        |CREATE VIEW v1 AS
+        |SELECT a, MAX(b) AS b, MIN(c) AS c
+        |FROM MyTable GROUP BY a
+        |""".stripMargin)
+
+    util.addTable(
+      s"""
+         |CREATE TABLE sink(
+         |  `id` INT,
+         |  `name` STRING,
+         |  `age` BIGINT,
+         |   primary key (id) not enforced
+         |) WITH (
+         |  'connector' = 'values',
+         |  'sink-insert-only' = 'false'
+         |)
+         |""".stripMargin)
+
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      """
+        |INSERT INTO sink
+        |SELECT * FROM v1
+        |""".stripMargin)
+    stmtSet.addInsertSql(
+      """
+        |INSERT INTO sink
+        |SELECT a, b, c FROM (
+        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn
+        |  FROM v1
+        |) WHERE rn < 3
+        |""".stripMargin)
+    util.verifyExecPlan(stmtSet)
+  }
+
   // TODO add tests about multi-sinks and udf
 }

Reply via email to