[
https://issues.apache.org/jira/browse/FLINK-13446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897140#comment-16897140
]
Timo Walther commented on FLINK-13446:
--------------------------------------
I just had an offline discussion with [~aljoscha] about this topic. The reason
for Flink's current behavior has historical reasons and is also determined by
how count windows are currently implemented. We both agree that Blink's
behavior is more intuitive. +1 for option (1) [~jark].
> Row count sliding window outputs incorrectly in blink planner
> -------------------------------------------------------------
>
> Key: FLINK-13446
> URL: https://issues.apache.org/jira/browse/FLINK-13446
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.9.0
> Reporter: Hequn Cheng
> Assignee: Hequn Cheng
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.9.0, 1.10.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> For blink planner, the Row count sliding window outputs incorrectly. The
> window assigner assigns less window than what expected. This means the window
> outputs fewer data. The bug can be reproduced by the following test:
> {code:java}
> @Test
> def testGroupWindowWithoutKeyInProjection(): Unit = {
> val data = List(
> (1L, 1, "Hi", 1, 1),
> (2L, 2, "Hello", 2, 2),
> (4L, 2, "Hello", 2, 2),
> (8L, 3, "Hello world", 3, 3),
> (16L, 3, "Hello world", 3, 3))
> val stream = failingDataSource(data)
> val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3,
> 'proctime.proctime)
> val weightAvgFun = new WeightedAvg
> val countDistinct = new CountDistinct
> val windowedTable = table
> .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
> .groupBy('w, 'int2, 'int3, 'string)
> .select(weightAvgFun('long, 'int), countDistinct('long))
> val sink = new TestingAppendSink
> windowedTable.toAppendStream[Row].addSink(sink)
> env.execute()
> val expected = Seq("12,2", "8,1", "2,1", "3,2", "1,1")
> assertEquals(expected.sorted, sink.getAppendResults.sorted)
> }
> {code}
> The expected output is Seq("12,2", "8,1", "2,1", "3,2", "1,1") while the
> actual output is Seq("12,2", "3,2")
> To fix the problem, we can correct the assign logic in
> CountSlidingWindowAssigner.assignWindows.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)