Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5555#discussion_r184405910
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
---
@@ -50,6 +50,155 @@ class OverWindowITCase extends
StreamingWithStateTestBase {
(8L, 8, "Hello World"),
(20L, 20, "Hello World"))
+ @Test
+ def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND
CURRENT ROW), " +
+ " MIN(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND
CURRENT ROW), " +
+ " COLLECT(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND
CURRENT ROW) " +
+ "FROM MyTable"
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,1,1,{1=1}",
+ "2,2,2,{2=1}",
+ "2,3,1,{1=1, 2=1}",
+ "3,2,2,{2=1}",
+ "3,2,2,{2=1}",
+ "3,5,2,{2=1, 3=1}",
+ "4,2,2,{2=1}",
+ "4,3,1,{1=1, 2=1}",
+ "4,3,1,{1=1, 2=1}",
+ "4,3,1,{1=1, 2=1}",
+ "5,1,1,{1=1}",
+ "5,4,1,{1=1, 3=1}",
+ "5,4,1,{1=1, 3=1}",
+ "5,6,1,{1=1, 2=1, 3=1}",
+ "5,5,2,{2=1, 3=1}")
+ assertEquals(expected, StreamITCase.testResults)
+ }
+
+ @Test
+ def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " COUNT(e) OVER (" +
+ " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ " SUM(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ " MIN(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "FROM MyTable"
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,1,1,1",
+ "2,1,2,2",
+ "2,2,3,1",
+ "3,1,2,2",
+ "3,2,2,2",
+ "3,3,5,2",
+ "4,1,2,2",
+ "4,2,3,1",
+ "4,3,3,1",
+ "4,4,3,1",
+ "5,1,1,1",
+ "5,2,4,1",
+ "5,3,4,1",
+ "5,4,6,1",
+ "5,5,6,1")
+ assertEquals(expected, StreamITCase.testResults)
+ }
+
+ @Test
+ def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+ // use out-of-order data to test distinct accumulator remove
+ val data = Seq(
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))), // early row
+ Right(3L),
+ Left((2L, (2L, 2, "Hello"))), // late row
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(7L),
+ Left((9L, (9L, 9, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Right(20L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
--- End diff --
Also, the event-time test is not required. We test the retract case also
with BOUNDED OVER windows (rows the fall out of the window are retracted).
---