wuchong commented on a change in pull request #12028:
URL: https://github.com/apache/flink/pull/12028#discussion_r433842050
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
##########
@@ -319,6 +319,48 @@ class WindowAggregateITCase(mode: StateBackendMode)
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
+ // used to verify compile works normally when constants exists in group
window key (FLINK-17553)
+ @Test
+ def testWindowAggregateOnConstantValue(): Unit = {
+ val ddl1 =
+ """
+ |CREATE TABLE src (
+ | log_ts STRING,
+ | ts TIMESTAMP(3),
+ | a INT,
+ | b DOUBLE,
+ | rowtime AS CAST(log_ts AS TIMESTAMP(3)),
+ | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val ddl2 =
+ """
+ |CREATE TABLE dst (
+ | ts TIMESTAMP(3),
+ | a BIGINT,
+ | b DOUBLE
+ |) WITH (
+ | 'connector.type' = 'filesystem',
+ | 'connector.path' = '/tmp/1',
+ | 'format.type' = 'csv'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |INSERT INTO dst
+ |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
+ |FROM src
+ | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
+ """.stripMargin
+ tEnv.sqlUpdate(ddl1)
+ tEnv.sqlUpdate(ddl2)
+ tEnv.sqlUpdate(query)
+ tEnv.explain(true)
Review comment:
I think we should verify the result, otherwise, it doesn't need to be
added to ITCase.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]