[
https://issues.apache.org/jira/browse/FLINK-13446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895928#comment-16895928
]
Jark Wu commented on FLINK-13446:
---------------------------------
Hi [~twalthr], currently, the behavior of row count sliding window in blink
planner is, for example, size=5, slide=3, then we will have following windows:
[1,2,3,4,5], [4,5,6,7,8], [7,8,9,10,11] ... (say row_id starts from 1)
In flink planner (or DataStream API), the assigned window will be:
[1,2,3], [2,3,4,5,6], [5,6,7,8,9]... (say row_id starts from 1)
We have two options for this issue:
(1) keep the blink planner behavior and explain it in docs.
(2) change blink planner behavior to flink planner.
However, IMO, the flink planner behavior might confuse users why the first 5
elements not in the same window. So I prefer the option#1.
No matter which option we choose, considering that it is a corner use case, I
don't think it's a blocker.
What do you think [~twalthr] [~hequn8128]?
> Row count sliding window outputs incorrectly in blink planner
> -------------------------------------------------------------
>
> Key: FLINK-13446
> URL: https://issues.apache.org/jira/browse/FLINK-13446
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.9.0
> Reporter: Hequn Cheng
> Assignee: Hequn Cheng
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.9.0, 1.10.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> For blink planner, the Row count sliding window outputs incorrectly. The
> window assigner assigns less window than what expected. This means the window
> outputs fewer data. The bug can be reproduced by the following test:
> {code:java}
> @Test
> def testGroupWindowWithoutKeyInProjection(): Unit = {
> val data = List(
> (1L, 1, "Hi", 1, 1),
> (2L, 2, "Hello", 2, 2),
> (4L, 2, "Hello", 2, 2),
> (8L, 3, "Hello world", 3, 3),
> (16L, 3, "Hello world", 3, 3))
> val stream = failingDataSource(data)
> val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3,
> 'proctime.proctime)
> val weightAvgFun = new WeightedAvg
> val countDistinct = new CountDistinct
> val windowedTable = table
> .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
> .groupBy('w, 'int2, 'int3, 'string)
> .select(weightAvgFun('long, 'int), countDistinct('long))
> val sink = new TestingAppendSink
> windowedTable.toAppendStream[Row].addSink(sink)
> env.execute()
> val expected = Seq("12,2", "8,1", "2,1", "3,2", "1,1")
> assertEquals(expected.sorted, sink.getAppendResults.sorted)
> }
> {code}
> The expected output is Seq("12,2", "8,1", "2,1", "3,2", "1,1") while the
> actual output is Seq("12,2", "3,2")
> To fix the problem, we can correct the assign logic in
> CountSlidingWindowAssigner.assignWindows.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)