Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186033706 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (20000L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { + // create a watermark with 10ms offset to delay the window emission by 10ms to verify merge + val sessionWindowTestdata = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (8L, 8, "Hello"), + (9L, 9, "Hello World"), + (4L, 4, "Hello"), + (16L, 16, "Hello")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + StreamITCase.clear + val stream = env + .fromCollection(sessionWindowTestdata) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L)) + + val tEnv = TableEnvironment.getTableEnvironment(env) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) + tEnv.registerTable("MyTable", table) + + val sqlQuery = "SELECT string, " + + " COUNT(DISTINCT long) " + --- End diff -- It would be good to add the end timestamp of the windows (`SESSION_END(rowtime, INTERVAL '0.005' SECOND)`) to make it easier to eyeball the expected test results.
---