[ 
https://issues.apache.org/jira/browse/FLINK-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995273#comment-16995273
 ] 

Xintong Song commented on FLINK-13721:
--------------------------------------

My apologies to [~keremulutas].
This is a valid issue, sorry for closing it. I misunderstood what you meant.

And thanks [~rmetzger] for correcting it.

> BroadcastState should support StateTTL
> --------------------------------------
>
>                 Key: FLINK-13721
>                 URL: https://issues.apache.org/jira/browse/FLINK-13721
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream, Runtime / State Backends
>    Affects Versions: 1.8.1
>         Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2, 
> Flink version 1.8.1
>            Reporter: Kerem Ulutaş
>            Priority: Major
>              Labels: feature, improvement, newbie
>         Attachments: DebugBroadcastStateTTL.java, IntHolder.java, 
> StringHolder.java, flink_broadcast_state_ttl_debug.log
>
>
> Hi everyone,
> Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.
> I have a use case which requires 2 data streams to be cross joined. To be 
> exact, one stream is location updates from clients and the other stream is 
> event data with location information. I'm trying to get events that happen 
> within a certain radius of client location(s).
> Since the streams can not be related to each other by using a common key for 
> partitioning, I have to broadcast client updates to all tasks and evaluate 
> the radius check for each event.
> The requirement here is, if we don't receive any location updates from a 
> client for a certain amount of time, we should consider the client is "gone" 
> (similar to the rationale stated in FLINK-3089 description)
> I have attached the sample application classes I used to debug 
> \{{BroadcastState}} and \{{StateTTL}} together.
> The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0" 
> got its first event at 17:08:07.67 (expected to be evicted sometime after 
> 17:08:37.xxx) but doesn't get evicted.
> For the operator part (which is the result of 
> \{{BroadcastConnectedStream<IntHolder, StringHolder>.process}}) - since 
> context in \{{onTimer}} method doesn't let user to change contents of the 
> broadcast state, only way to deal with stale client data is as follows:
>  * In \{{processElement}} method, calculate result for client data which is 
> newer than the ttl
>  * In \{{processBroadcastElement}} method, remove client data older than a 
> certain amount of time from the broadcast state
> If broadcast side of the connected streams doesn't get data for longer than 
> the desired time-to-live amount of time, \{{BroadcastState}} will hold stale 
> data and \{{processElement}} method would have to filter those client data 
> each time. This is the method I am using in production right now.
> I am not aware if \{{BroadcastState}} already supports \{{StateTTL}} and I'm 
> using it incorrectly. I am also not aware of any decision or limitation that 
> makes it not possible to implement \{{StateTTL}} for \{{BroadcastState}}, I 
> will be pleased if someone explains if there are any.
> Thanks and regards.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to