godfreyhe commented on a change in pull request #12028:
URL: https://github.com/apache/flink/pull/12028#discussion_r430134412



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
##########
@@ -319,6 +319,49 @@ class WindowAggregateITCase(mode: StateBackendMode)
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+  // used to verify compile works normally when constants exists in group 
window key (FLINK-17553)
+  @Test
+  def testWindowAggregateOnConstantValue(): Unit = {
+    val ddl1 =
+      """
+        |CREATE TABLE src (
+        |  log_ts STRING,
+        |  ts TIMESTAMP(3),
+        |  a INT,
+        |  b DOUBLE,
+        |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
+        |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
+        |) WITH (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val ddl2 =
+      """
+        |CREATE TABLE dst (
+        |  ts TIMESTAMP(3),
+        |  a BIGINT,
+        |  b DOUBLE
+        |) WITH (
+        |  'connector.type' = 'filesystem',
+        |  'connector.path' = '/tmp/1',
+        |  'format.type' = 'csv'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |INSERT INTO dst
+        |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
+        |FROM src
+        | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
+      """.stripMargin
+    tEnv.sqlUpdate(ddl1)
+    tEnv.sqlUpdate(ddl2)
+    tEnv.sqlUpdate(query)
+    tEnv.explain(true)
+  }
+

Review comment:
       nit: redundant empty line

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
##########
@@ -109,6 +109,8 @@ object FlinkStreamRuleSets {
       List(
         //removes constant keys from an Agg
         AggregateProjectPullUpConstantsRule.INSTANCE,
+        //fix: FLINK-17553 unsupported call error when constant exists in 
group window key
+        ProjectMergeRule.INSTANCE,

Review comment:
       add the following sentence: this rule will merge the project generated 
by `AggregateProjectPullUpConstantsRule` and make sure window aggregate can be 
correctly rewritten by StreamLogicalWindowAggregateRule 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to