Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5555#discussion_r184405329
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
    @@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
         (8L, 8, "Hello World"),
         (20L, 20, "Hello World"))
     
    +  @Test
    +  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get5TupleDataStream(env)
    +      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val sqlQuery = "SELECT a, " +
    +      "  SUM(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
    +      "  MIN(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
    +      "  COLLECT(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
    +      "FROM MyTable"
    +
    +    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = List(
    +      "1,1,1,{1=1}",
    +      "2,2,2,{2=1}",
    +      "2,3,1,{1=1, 2=1}",
    +      "3,2,2,{2=1}",
    +      "3,2,2,{2=1}",
    +      "3,5,2,{2=1, 3=1}",
    +      "4,2,2,{2=1}",
    +      "4,3,1,{1=1, 2=1}",
    +      "4,3,1,{1=1, 2=1}",
    +      "4,3,1,{1=1, 2=1}",
    +      "5,1,1,{1=1}",
    +      "5,4,1,{1=1, 3=1}",
    +      "5,4,1,{1=1, 3=1}",
    +      "5,6,1,{1=1, 2=1, 3=1}",
    +      "5,5,2,{2=1, 3=1}")
    +    assertEquals(expected, StreamITCase.testResults)
    +  }
    +
    +  @Test
    +  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get5TupleDataStream(env)
    +      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val sqlQuery = "SELECT a, " +
    +      "  COUNT(e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
    +      "  SUM(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
    +      "  MIN(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
    +      "FROM MyTable"
    +
    +    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = List(
    +      "1,1,1,1",
    +      "2,1,2,2",
    +      "2,2,3,1",
    +      "3,1,2,2",
    +      "3,2,2,2",
    +      "3,3,5,2",
    +      "4,1,2,2",
    +      "4,2,3,1",
    +      "4,3,3,1",
    +      "4,4,3,1",
    +      "5,1,1,1",
    +      "5,2,4,1",
    +      "5,3,4,1",
    +      "5,4,6,1",
    +      "5,5,6,1")
    +    assertEquals(expected, StreamITCase.testResults)
    +  }
    +
    +  @Test
    +  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
    +    // use out-of-order data to test distinct accumulator remove
    +    val data = Seq(
    +      Left((2L, (2L, 2, "Hello"))),
    +      Left((2L, (2L, 2, "Hello"))),
    +      Left((1L, (1L, 1, "Hello"))),
    +      Left((1L, (1L, 1, "Hello"))),
    +      Left((2L, (2L, 2, "Hello"))),
    +      Left((1L, (1L, 1, "Hello"))),
    +      Left((20L, (20L, 20, "Hello World"))), // early row
    +      Right(3L),
    +      Left((2L, (2L, 2, "Hello"))), // late row
    +      Left((3L, (3L, 3, "Hello"))),
    +      Left((4L, (4L, 4, "Hello"))),
    +      Left((5L, (5L, 5, "Hello"))),
    +      Left((6L, (6L, 6, "Hello"))),
    +      Left((7L, (7L, 7, "Hello World"))),
    +      Right(7L),
    +      Left((9L, (9L, 9, "Hello World"))),
    +      Left((8L, (8L, 8, "Hello World"))),
    +      Left((8L, (8L, 8, "Hello World"))),
    +      Right(20L))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    --- End diff --
    
    Actually, I was wrong on this one. Late elements are not deterministic 
handled if p > 1.
    Will change it back.


---

Reply via email to