[ 
https://issues.apache.org/jira/browse/FLINK-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075878#comment-17075878
 ] 

Ori Popowski edited comment on FLINK-16929 at 4/6/20, 7:17 AM:
---------------------------------------------------------------

# {{SessionStartProcessFunction}} just ouputs to side output events that mark 
when the session is active/inactive
 # The main output is used to output to {{sessionStartSink}}
 # Here's a version without split:

{code:java}
        KeyedStream<Foo, String> keyedStream = inputStream
                .keyBy(keySelector);        WindowedStream<Foo, String, 
TimeWindow> windowedStream =
                keyedStream.window(EventTimeSessionWindows.withGap(gap))
                .allowedLateness(allowedLateness);        
SingleOutputStreamOperator<SessionEventFlat> sessionAggregation = windowedStream
                .aggregate(new SessionAggregateFunction(false), new 
SessionIncrementalAggregationProcessWindowFunction());        sessionAggregation
                .keyBy(SessionEventFlat::getSId)
                .addSink(sessionsSink)
                .setParallelism(4);
{code}
*But all of the above is irrelevant because I've found the reason for the 
incorrect session cutoff.*

The reason is not the splitting (the side output) as I originally thought, and 
also it has nothing to do with Flink 1.9 (it happens also in 1.5).

We've found out that the {{EventTimeSessionWindow}} will triggers a new window 
when there are late events. I have no idea why it happens because:
 # We have an event-time watermark with 5 minutes out-of-orderdness.
 # The Session Window is configured with allowed lateness of 0.

According to the docs, late events should be _dropped_. Here it seems that they 
trigger a window. If we'll understand why it happens we'll solve the problem.


was (Author: oripwk):
# {{SessionStartProcessFunction}} just ouputs to side output events that mark 
when the session is active/inactive
 # The main output is used to output to {{sessionStartSink}}
 # Here's a version without split:

{code:java}
        KeyedStream<Foo, String> keyedStream = inputStream
                .keyBy(keySelector);        WindowedStream<Foo, String, 
TimeWindow> windowedStream =
                keyedStream.window(EventTimeSessionWindows.withGap(gap))
                .allowedLateness(allowedLateness);        
SingleOutputStreamOperator<SessionEventFlat> sessionAggregation = windowedStream
                .aggregate(new SessionAggregateFunction(false), new 
SessionIncrementalAggregationProcessWindowFunction());        sessionAggregation
                .keyBy(SessionEventFlat::getSId)
                .addSink(sessionsSink)
                .setParallelism(4);
{code}
*But all of the above is irrelevant because I've found the reason for the 
incorrect session cutoff.*

The reason is not the splitting (the side output) as I originally thought, and 
also it has nothing to do with Flink 1.9 (it happens also in 1.5).

We've found out that the {{EventTimeSessionWindows}}will triggers a new window 
when there are late events. I have no idea why it happens because:
 # We have an event-time watermark with 5 minutes out-of-orderdness.
 # The Session Window is configured with allowed lateness of 0.

According to the docs, late events should be _dropped_. Here it seems that they 
trigger a window. If we'll understand why it happens we'll solve the problem.

> Session Window produces sessions randomly
> -----------------------------------------
>
>                 Key: FLINK-16929
>                 URL: https://issues.apache.org/jira/browse/FLINK-16929
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.9.1
>            Reporter: Ori Popowski
>            Priority: Major
>         Attachments: image-2020-04-01-19-56-00-239.png, 
> image-2020-04-01-19-56-27-720.png
>
>
>  We have a Flink job which keyBys session ID (sId), and uses a session window 
> with 30 minutes gap:
> {code:java}
> inputStream
>     .keyBy(keySelector)
>     .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
>     .allowedLateness(Time.seconds(0L))
> {code}
> This Flink job reads from Kinesis stream.
> Lately (I suspect after upgrading from 1.5.4 to 1.9.1) we get too many 
> sessions, with gaps of several seconds (instead of 30 minutes).
> We have no idea why it's happening and suspect a Flink bug or a state backend 
> bug (we use RocksDB).
> I haven't found any indication in the logs except for some read throughput 
> warnings which were resolved by a backoff.
> Attached is a table of derived sessions, and then the raw events
> *Sessions*
>   !image-2020-04-01-19-56-00-239.png|width=753,height=406!
>  
> *Events*
>  
> !image-2020-04-01-19-56-27-720.png|width=312,height=383!   
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to