Hequn Cheng created FLINK-13446:
-----------------------------------
Summary: Row count sliding window outputs incorrectly in blink
planner
Key: FLINK-13446
URL: https://issues.apache.org/jira/browse/FLINK-13446
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.9.0
Reporter: Hequn Cheng
For blink planner, the Row count sliding window outputs incorrectly. The window
assigner assigns less window than what expected. This means the window outputs
fewer data. The bug can be reproduced by the following test:
{code:java}
@Test
def testGroupWindowWithoutKeyInProjection(): Unit = {
val data = List(
(1L, 1, "Hi", 1, 1),
(2L, 2, "Hello", 2, 2),
(4L, 2, "Hello", 2, 2),
(8L, 3, "Hello world", 3, 3),
(16L, 3, "Hello world", 3, 3))
val stream = failingDataSource(data)
val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3,
'proctime.proctime)
val weightAvgFun = new WeightedAvg
val countDistinct = new CountDistinct
val windowedTable = table
.window(Slide over 2.rows every 1.rows on 'proctime as 'w)
.groupBy('w, 'int2, 'int3, 'string)
.select(weightAvgFun('long, 'int), countDistinct('long))
val sink = new TestingAppendSink
windowedTable.toAppendStream[Row].addSink(sink)
env.execute()
val expected = Seq("12,2", "8,1", "2,1", "3,2", "1,1")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
{code}
The expected output is Seq("12,2", "8,1", "2,1", "3,2", "1,1") while the actual
output is Seq("12,2", "3,2")
To fix the problem, we can correct the assign logic in
CountSlidingWindowAssigner.assignWindows.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)