[
https://issues.apache.org/jira/browse/FLINK-5735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546574#comment-16546574
]
Aljoscha Krettek commented on FLINK-5735:
-----------------------------------------
I think the problem here is that the watermark assigner is not correct. The
timestamps of the events are out of order and the result depends how they are
emitted in parallel and how the watermark progresses. Could that be the case?
> Non-overlapping sliding window is not deterministic
> ---------------------------------------------------
>
> Key: FLINK-5735
> URL: https://issues.apache.org/jira/browse/FLINK-5735
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Valerii Florov
> Priority: Major
>
> I don't know if this is a problem of the Table API or the underlying API. We
> have to investigate this as part of the issue.
> The following code leads to different results from time to time. Sometimes
> the count of "Hello" is 1 sometimes 2.
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hallo"),
> (3L, 2, "Hello"),
> (6L, 3, "Hello"),
> (4L, 5, "Hello"),
> (16L, 4, "Hello world"),
> (8L, 3, "Hello world"))
> @Test
> def testEventTimeSlidingWindowNonOverlapping(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.testResults = mutable.MutableList()
> val stream = env
> .fromCollection(data)
> .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
> val table = stream.toTable(tEnv, 'long, 'int, 'string)
> val windowedTable = table
> .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
> .groupBy('w, 'string)
> .select('string, 'int.count, 'w.start, 'w.end)
> val results = windowedTable.toDataStream[Row]
> results.addSink(new StreamITCase.StringSink)
> env.execute()
> val expected = Seq(
> "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
> "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
> "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
> assertEquals(expected.sorted, StreamITCase.testResults.sorted)
> }
> class TimestampWithEqualWatermark extends
> AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
> override def checkAndGetNextWatermark(
> lastElement: (Long, Int, String),
> extractedTimestamp: Long)
> : Watermark = {
> new Watermark(extractedTimestamp)
> }
> override def extractTimestamp(
> element: (Long, Int, String),
> previousElementTimestamp: Long): Long = {
> element._1
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)