[ 
https://issues.apache.org/jira/browse/FLINK-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-13721:
-----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor feature 
improvement newbie  (was: auto-deprioritized-major feature improvement newbie 
stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> BroadcastState should support StateTTL
> --------------------------------------
>
>                 Key: FLINK-13721
>                 URL: https://issues.apache.org/jira/browse/FLINK-13721
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream, Runtime / State Backends
>    Affects Versions: 1.8.1
>         Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2, 
> Flink version 1.8.1
>            Reporter: Kerem Ulutaş
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> feature, improvement, newbie
>         Attachments: DebugBroadcastStateTTL.java, IntHolder.java, 
> StringHolder.java, flink_broadcast_state_ttl_debug.log
>
>
> Hi everyone,
> Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.
> I have a use case which requires 2 data streams to be cross joined. To be 
> exact, one stream is location updates from clients and the other stream is 
> event data with location information. I'm trying to get events that happen 
> within a certain radius of client location(s).
> Since the streams can not be related to each other by using a common key for 
> partitioning, I have to broadcast client updates to all tasks and evaluate 
> the radius check for each event.
> The requirement here is, if we don't receive any location updates from a 
> client for a certain amount of time, we should consider the client is "gone" 
> (similar to the rationale stated in FLINK-3089 description)
> I have attached the sample application classes I used to debug 
> \{{BroadcastState}} and \{{StateTTL}} together.
> The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0" 
> got its first event at 17:08:07.67 (expected to be evicted sometime after 
> 17:08:37.xxx) but doesn't get evicted.
> For the operator part (which is the result of 
> \{{BroadcastConnectedStream<IntHolder, StringHolder>.process}}) - since 
> context in \{{onTimer}} method doesn't let user to change contents of the 
> broadcast state, only way to deal with stale client data is as follows:
>  * In \{{processElement}} method, calculate result for client data which is 
> newer than the ttl
>  * In \{{processBroadcastElement}} method, remove client data older than a 
> certain amount of time from the broadcast state
> If broadcast side of the connected streams doesn't get data for longer than 
> the desired time-to-live amount of time, \{{BroadcastState}} will hold stale 
> data and \{{processElement}} method would have to filter those client data 
> each time. This is the method I am using in production right now.
> I am not aware if \{{BroadcastState}} already supports \{{StateTTL}} and I'm 
> using it incorrectly. I am also not aware of any decision or limitation that 
> makes it not possible to implement \{{StateTTL}} for \{{BroadcastState}}, I 
> will be pleased if someone explains if there are any.
> Thanks and regards.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to