[
https://issues.apache.org/jira/browse/FLINK-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16980014#comment-16980014
]
Xintong Song commented on FLINK-13721:
--------------------------------------
Hi [~keremulutas],
Thanks for bringing this discussion.
Here's the contribution guideline for Apache Flink community.
[https://flink.apache.org/contributing/how-to-contribute.html]
According to the guideline, we should first report and discuss the problem and
proposal of solution in Jira ticket. Once consensus are made, you can reach to
a committer to assign this issue to you. Only when the issue is assigned,
should we start the implementation. And we use github pull request for code
contribution.
I'm going to close this ticket for now. Feel free to open a new one or reopen
and edit this one if you're still willing to discuss and contribute on the
reported issue.
> BroadcastState should support StateTTL
> --------------------------------------
>
> Key: FLINK-13721
> URL: https://issues.apache.org/jira/browse/FLINK-13721
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream, Runtime / State Backends
> Affects Versions: 1.8.1
> Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2,
> Flink version 1.8.1
> Reporter: Kerem Ulutaş
> Priority: Major
> Labels: feature, improvement, newbie
> Attachments: DebugBroadcastStateTTL.java, IntHolder.java,
> StringHolder.java, flink_broadcast_state_ttl_debug.log
>
>
> Hi everyone,
> Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.
> I have a use case which requires 2 data streams to be cross joined. To be
> exact, one stream is location updates from clients and the other stream is
> event data with location information. I'm trying to get events that happen
> within a certain radius of client location(s).
> Since the streams can not be related to each other by using a common key for
> partitioning, I have to broadcast client updates to all tasks and evaluate
> the radius check for each event.
> The requirement here is, if we don't receive any location updates from a
> client for a certain amount of time, we should consider the client is "gone"
> (similar to the rationale stated in FLINK-3089 description)
> I have attached the sample application classes I used to debug
> \{{BroadcastState}} and \{{StateTTL}} together.
> The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0"
> got its first event at 17:08:07.67 (expected to be evicted sometime after
> 17:08:37.xxx) but doesn't get evicted.
> For the operator part (which is the result of
> \{{BroadcastConnectedStream<IntHolder, StringHolder>.process}}) - since
> context in \{{onTimer}} method doesn't let user to change contents of the
> broadcast state, only way to deal with stale client data is as follows:
> * In \{{processElement}} method, calculate result for client data which is
> newer than the ttl
> * In \{{processBroadcastElement}} method, remove client data older than a
> certain amount of time from the broadcast state
> If broadcast side of the connected streams doesn't get data for longer than
> the desired time-to-live amount of time, \{{BroadcastState}} will hold stale
> data and \{{processElement}} method would have to filter those client data
> each time. This is the method I am using in production right now.
> I am not aware if \{{BroadcastState}} already supports \{{StateTTL}} and I'm
> using it incorrectly. I am also not aware of any decision or limitation that
> makes it not possible to implement \{{StateTTL}} for \{{BroadcastState}}, I
> will be pleased if someone explains if there are any.
> Thanks and regards.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)