[
https://issues.apache.org/jira/browse/FLINK-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075878#comment-17075878
]
Ori Popowski commented on FLINK-16929:
--------------------------------------
# {{SessionStartProcessFunction}} just ouputs to side output events that mark
when the session is active/inactive
# The main output is used to output to {{sessionStartSink}}
# Here's a version without split:
{code:java}
KeyedStream<Foo, String> keyedStream = inputStream
.keyBy(keySelector); WindowedStream<Foo, String,
TimeWindow> windowedStream =
keyedStream.window(EventTimeSessionWindows.withGap(gap))
.allowedLateness(allowedLateness);
SingleOutputStreamOperator<SessionEventFlat> sessionAggregation = windowedStream
.aggregate(new SessionAggregateFunction(false), new
SessionIncrementalAggregationProcessWindowFunction()); sessionAggregation
.keyBy(SessionEventFlat::getSId)
.addSink(sessionsSink)
.setParallelism(4);
{code}
*But all of the above is irrelevant because I've found the reason for the
incorrect session cutoff.*
The reason is not the splitting (the side output) as I originally thought, and
also it has nothing to do with Flink 1.9 (it happens also in 1.5).
We've found out that the {{EventTimeSessionWindows}}will triggers a new window
when there are late events. I have no idea why it happens because:
# We have an event-time watermark with 5 minutes out-of-orderdness.
# The Session Window is configured with allowed lateness of 0.
According to the docs, late events should be _dropped_. Here it seems that they
trigger a window. If we'll understand why it happens we'll solve the problem.
> Session Window produces sessions randomly
> -----------------------------------------
>
> Key: FLINK-16929
> URL: https://issues.apache.org/jira/browse/FLINK-16929
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.9.1
> Reporter: Ori Popowski
> Priority: Major
> Attachments: image-2020-04-01-19-56-00-239.png,
> image-2020-04-01-19-56-27-720.png
>
>
> We have a Flink job which keyBys session ID (sId), and uses a session window
> with 30 minutes gap:
> {code:java}
> inputStream
> .keyBy(keySelector)
> .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
> .allowedLateness(Time.seconds(0L))
> {code}
> This Flink job reads from Kinesis stream.
> Lately (I suspect after upgrading from 1.5.4 to 1.9.1) we get too many
> sessions, with gaps of several seconds (instead of 30 minutes).
> We have no idea why it's happening and suspect a Flink bug or a state backend
> bug (we use RocksDB).
> I haven't found any indication in the logs except for some read throughput
> warnings which were resolved by a backoff.
> Attached is a table of derived sessions, and then the raw events
> *Sessions*
> !image-2020-04-01-19-56-00-239.png|width=753,height=406!
>
> *Events*
>
> !image-2020-04-01-19-56-27-720.png|width=312,height=383!
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)