This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 677021c  [FLINK-18440][table-planner-blink] ROW_NUMBER function: 
ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
677021c is described below

commit 677021cc1ca872dd81fed8cd5e5aea834a3b5236
Author: yuzhao.cyz <[email protected]>
AuthorDate: Fri Jul 10 19:03:47 2020 +0800

    [FLINK-18440][table-planner-blink] ROW_NUMBER function: ROW/RANGE not 
allowed with RANK, DENSE_RANK or ROW_NUMBER functions
    
    This closes #12868
---
 .../operations/SqlToOperationConverter.java        | 11 +++++--
 .../table/planner/plan/batch/sql/RankTest.xml      | 29 ++++++++++++++++++
 .../table/planner/plan/stream/sql/RankTest.xml     | 26 +++++++++++++++++
 .../table/planner/plan/batch/sql/RankTest.scala    | 34 ++++++++++++++++++++++
 .../table/planner/plan/stream/sql/RankTest.scala   | 34 ++++++++++++++++++++++
 5 files changed, 132 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index be54497..fa04539 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -659,6 +659,15 @@ public class SqlToOperationConverter {
                final SqlNodeList fieldList = sqlCreateView.getFieldList();
 
                SqlNode validateQuery = flinkPlanner.validate(query);
+               // Put the sql string unparse (getQuotedSqlString()) in front of
+               // the node conversion (toQueryOperation()),
+               // because before Calcite 1.22.0, during sql-to-rel conversion, 
the SqlWindow
+               // bounds state would be mutated as default when they are null 
(not specified).
+
+               // This bug is fixed in CALCITE-3877 of Calcite 1.23.0.
+               String originalQuery = getQuotedSqlString(query);
+               String expandedQuery = getQuotedSqlString(validateQuery);
+
                PlannerQueryOperation operation = 
toQueryOperation(flinkPlanner, validateQuery);
                TableSchema schema = operation.getTableSchema();
 
@@ -681,8 +690,6 @@ public class SqlToOperationConverter {
                        schema = TableSchema.builder().fields(aliasFieldNames, 
inputFieldTypes).build();
                }
 
-               String originalQuery = getQuotedSqlString(query);
-               String expandedQuery = getQuotedSqlString(validateQuery);
                String comment = sqlCreateView.getComment().map(c -> 
c.getNlsString().getValue()).orElse(null);
                CatalogView catalogView = new CatalogViewImpl(originalQuery,
                                expandedQuery,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
index b1cc4aa..743263c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
@@ -214,4 +214,33 @@ Calc(select=[a, b, $2])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testCreateViewWithRowNumber">
+    <Resource name="sql">
+      <![CDATA[insert into sink select name, eat, cnt
+from view2 where row_num <= 3]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[name, eat, 
cnt])
++- LogicalProject(name=[$0], eat=[$1], cnt=[$2])
+   +- LogicalFilter(condition=[<=($3, 3)])
+      +- LogicalProject(name=[$0], eat=[$1], cnt=[$2], row_num=[ROW_NUMBER() 
OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW)])
+         +- LogicalAggregate(group=[{0, 1}], cnt=[SUM($2)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
test_source]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
++- Calc(select=[name, eat, cnt], where=[<=(w0$o0, 3)])
+   +- OverAggregate(partitionBy=[name], orderBy=[cnt DESC], 
window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[name, eat, cnt, w0$o0])
+      +- Sort(orderBy=[name ASC, cnt DESC])
+         +- Exchange(distribution=[hash[name]])
+            +- HashAggregate(isMerge=[true], groupBy=[name, eat], 
select=[name, eat, Final_SUM(sum$0) AS cnt])
+               +- Exchange(distribution=[hash[name, eat]])
+                  +- LocalHashAggregate(groupBy=[name, eat], select=[name, 
eat, Partial_SUM(age) AS sum$0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, test_source]], fields=[name, eat, age])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 208b241..7f4c655 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -817,4 +817,30 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testCreateViewWithRowNumber">
+    <Resource name="sql">
+      <![CDATA[insert into sink select name, eat, cnt
+from view2 where row_num <= 3]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[name, eat, 
cnt])
++- LogicalProject(name=[$0], eat=[$1], cnt=[$2])
+   +- LogicalFilter(condition=[<=($3, 3)])
+      +- LogicalProject(name=[$0], eat=[$1], cnt=[$2], row_num=[ROW_NUMBER() 
OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW)])
+         +- LogicalAggregate(group=[{0, 1}], cnt=[SUM($2)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
test_source]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[name, eat, cnt])
++- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=3], partitionBy=[name], orderBy=[cnt DESC], 
select=[name, eat, cnt])
+   +- Exchange(distribution=[hash[name]])
+      +- GroupAggregate(groupBy=[name, eat], select=[name, eat, SUM(age) AS 
cnt])
+         +- Exchange(distribution=[hash[name, eat]])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
test_source]], fields=[name, eat, age])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala
index 62fb3f6..01e89f5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RankTest.scala
@@ -167,4 +167,38 @@ class RankTest extends TableTestBase {
       """.stripMargin
     util.verifyPlan(sqlQuery)
   }
+
+  @Test
+  def testCreateViewWithRowNumber(): Unit = {
+    util.addTable(
+      """
+        |CREATE TABLE test_source (
+        |  name STRING,
+        |  eat STRING,
+        |  age BIGINT
+        |) WITH (
+        |  'connector' = 'values',
+        |  'bounded' = 'true'
+        |)
+      """.stripMargin)
+    util.tableEnv.executeSql("create view view1 as select name, eat ,sum(age) 
as cnt\n"
+      + "from test_source group by name, eat")
+    util.tableEnv.executeSql("create view view2 as\n"
+      + "select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY cnt DESC) as 
row_num\n"
+      + "from view1")
+    util.addTable(
+      s"""
+         |create table sink (
+         |  name varchar,
+         |  eat varchar,
+         |  cnt bigint
+         |)
+         |with(
+         |  'connector' = 'print'
+         |)
+         |""".stripMargin
+    )
+    util.verifyPlanInsert("insert into sink select name, eat, cnt\n"
+      + "from view2 where row_num <= 3")
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 37cbb21..064cbf3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -638,5 +638,39 @@ class RankTest extends TableTestBase {
     util.verifyPlan(sql, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testCreateViewWithRowNumber(): Unit = {
+    util.addTable(
+      """
+        |CREATE TABLE test_source (
+        |  name STRING,
+        |  eat STRING,
+        |  age BIGINT
+        |) WITH (
+        |  'connector' = 'values',
+        |  'bounded' = 'false'
+        |)
+      """.stripMargin)
+    util.tableEnv.executeSql("create view view1 as select name, eat ,sum(age) 
as cnt\n"
+      + "from test_source group by name, eat")
+    util.tableEnv.executeSql("create view view2 as\n"
+      + "select *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY cnt DESC) as 
row_num\n"
+      + "from view1")
+    util.addTable(
+      s"""
+         |create table sink (
+         |  name varchar,
+         |  eat varchar,
+         |  cnt bigint
+         |)
+         |with(
+         |  'connector' = 'print'
+         |)
+         |""".stripMargin
+    )
+    util.verifyPlanInsert("insert into sink select name, eat, cnt\n"
+      + "from view2 where row_num <= 3")
+  }
+
   // TODO add tests about multi-sinks and udf
 }

Reply via email to