[
https://issues.apache.org/jira/browse/FLINK-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995273#comment-16995273
]
Xintong Song commented on FLINK-13721:
--------------------------------------
My apologies to [~keremulutas].
This is a valid issue, sorry for closing it. I misunderstood what you meant.
And thanks [~rmetzger] for correcting it.
> BroadcastState should support StateTTL
> --------------------------------------
>
> Key: FLINK-13721
> URL: https://issues.apache.org/jira/browse/FLINK-13721
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream, Runtime / State Backends
> Affects Versions: 1.8.1
> Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2,
> Flink version 1.8.1
> Reporter: Kerem Ulutaş
> Priority: Major
> Labels: feature, improvement, newbie
> Attachments: DebugBroadcastStateTTL.java, IntHolder.java,
> StringHolder.java, flink_broadcast_state_ttl_debug.log
>
>
> Hi everyone,
> Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.
> I have a use case which requires 2 data streams to be cross joined. To be
> exact, one stream is location updates from clients and the other stream is
> event data with location information. I'm trying to get events that happen
> within a certain radius of client location(s).
> Since the streams can not be related to each other by using a common key for
> partitioning, I have to broadcast client updates to all tasks and evaluate
> the radius check for each event.
> The requirement here is, if we don't receive any location updates from a
> client for a certain amount of time, we should consider the client is "gone"
> (similar to the rationale stated in FLINK-3089 description)
> I have attached the sample application classes I used to debug
> \{{BroadcastState}} and \{{StateTTL}} together.
> The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0"
> got its first event at 17:08:07.67 (expected to be evicted sometime after
> 17:08:37.xxx) but doesn't get evicted.
> For the operator part (which is the result of
> \{{BroadcastConnectedStream<IntHolder, StringHolder>.process}}) - since
> context in \{{onTimer}} method doesn't let user to change contents of the
> broadcast state, only way to deal with stale client data is as follows:
> * In \{{processElement}} method, calculate result for client data which is
> newer than the ttl
> * In \{{processBroadcastElement}} method, remove client data older than a
> certain amount of time from the broadcast state
> If broadcast side of the connected streams doesn't get data for longer than
> the desired time-to-live amount of time, \{{BroadcastState}} will hold stale
> data and \{{processElement}} method would have to filter those client data
> each time. This is the method I am using in production right now.
> I am not aware if \{{BroadcastState}} already supports \{{StateTTL}} and I'm
> using it incorrectly. I am also not aware of any decision or limitation that
> makes it not possible to implement \{{StateTTL}} for \{{BroadcastState}}, I
> will be pleased if someone explains if there are any.
> Thanks and regards.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)