[ 
https://issues.apache.org/jira/browse/FLINK-5735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546488#comment-16546488
 ] 

Valerii Florov commented on FLINK-5735:
---------------------------------------

Hello dear all

I shortened the given example and have been playing with it.

 

 
{code:java}
val data = List(
 (3L, 2, "Hello"),
 (6L, 3, "Hello"),
 (4L, 5, "Hello"),
 (13L, 2, "Hello"),
 (16L, 3, "Hello"),
 (14L, 5, "Hello")
)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)

val stream = env
 .fromCollection(data)
 .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, 
String)](0L))

tEnv.registerDataStream("ttt", stream, 'msec, 'n, 'name, 'rowtime.rowtime)
val tableResult = tEnv.sqlQuery("SELECT SUM(msec), SUM(n), name FROM ttt GROUP 
BY " +
 "HOP(rowtime, INTERVAL '0.010' SECOND, INTERVAL '0.005' SECOND), name")
tableResult.toAppendStream[Row].print()
env.execute(){code}
 

This piece of code gets results like this (in different tries):

4> 7,7,Hello
4> 27,7,Hello

4> 3,2,Hello
4> 27,7,Hello

4> 7,7,Hello
4> 13,2,Hello

 

I think it shows that the root of problem not in aggregate functions rather in 
the group function. As you can see the problem can appear in any time frame. If 
the issue is in metadata all time frames should be corrupt. 

Also, I played with quite similar test but in term of Table API:

 
{code:java}
val windowedTable = table
 .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
.groupBy('w)
 .select('int.count, 'w.start, 'w.end)
 
{code}
Results were the same. 

Debugging led me inside Akka actors and it was too hard to avoid timeout 
exceptions when you were trying debug step by step. I couldn't. 

So, my question. Could you tell me if I'm moving in right direction or not? And 
how it is possible to achieve the class that exactly is responsible for 
grouping by window in runtime?

I would appreciate for any advice from you.

Best regards

Valery

> Non-overlapping sliding window is not deterministic
> ---------------------------------------------------
>
>                 Key: FLINK-5735
>                 URL: https://issues.apache.org/jira/browse/FLINK-5735
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Valerii Florov
>            Priority: Major
>
> I don't know if this is a problem of the Table API or the underlying API. We 
> have to investigate this as part of the issue.
> The following code leads to different results from time to time. Sometimes 
> the count of "Hello" is 1 sometimes 2.
> {code}
>   val data = List(
>     (1L, 1, "Hi"),
>     (2L, 2, "Hallo"),
>     (3L, 2, "Hello"),
>     (6L, 3, "Hello"),
>     (4L, 5, "Hello"),
>     (16L, 4, "Hello world"),
>     (8L, 3, "Hello world"))
>   @Test
>   def testEventTimeSlidingWindowNonOverlapping(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     StreamITCase.testResults = mutable.MutableList()
>     val stream = env
>       .fromCollection(data)
>       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>     val table = stream.toTable(tEnv, 'long, 'int, 'string)
>     val windowedTable = table
>       .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
>       .groupBy('w, 'string)
>       .select('string, 'int.count, 'w.start, 'w.end)
>     val results = windowedTable.toDataStream[Row]
>     results.addSink(new StreamITCase.StringSink)
>     env.execute()
>     val expected = Seq(
>       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
>       "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
>       "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
>     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
>   }
>   class TimestampWithEqualWatermark extends 
> AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
>     override def checkAndGetNextWatermark(
>         lastElement: (Long, Int, String),
>         extractedTimestamp: Long)
>       : Watermark = {
>       new Watermark(extractedTimestamp)
>     }
>     override def extractTimestamp(
>         element: (Long, Int, String),
>         previousElementTimestamp: Long): Long = {
>       element._1
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to