[ 
https://issues.apache.org/jira/browse/SPARK-19838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-19838.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 2.2.0

https://github.com/apache/spark/pull/17179

> Adding Processing Time based timeout
> ------------------------------------
>
>                 Key: SPARK-19838
>                 URL: https://issues.apache.org/jira/browse/SPARK-19838
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>             Fix For: 2.2.0
>
>
> When a key does not get any new data in mapGroupsWithState, the mapping 
> function is never called on it. So we need a timeout feature that calls the 
> function again in such cases, so that the user can decide whether to continue 
> waiting or clean up (remove state, save stuff externally, etc.).
> Timeouts can be either based  on processing time or event time. This JIRA is 
> for processing time, but defines the high level API design for both. The 
> usage would look like this 
> {code}
> def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = {
>   ...
>   state.setTimeoutDuration(10000)
>   ...
> }
> dataset                                       // type is Dataset[T]
>   .groupByKey[K](keyingFunc)   // generates KeyValueGroupedDataset[K, T]
>   .mapGroupsWithState[S, U](
>      func = stateFunction, 
>      timeout = KeyedStateTimeout.withProcessingTime)  // returns Dataset[U]
> {code}
> Note the following design aspects. 
> - The timeout type is provided as a param in mapGroupsWithState as a 
> parameter global to all the keys. This is so that the planner knows this at 
> planning time, and accordingly optimize the execution based on whether to 
> saves extra info in state or not (e.g. timeout durations or timestamps).
> - The exact timeout duration is provided inside the function call so that it 
> can be customized on a per key basis.
> - When the timeout occurs for a key, the function is called with no values, 
> and {{KeyedState.isTimingOut()}} set to {{true}}.
> - The timeout is reset for key every time the function is called on the key, 
> that is, when the key has new data, or the key has timed out. So the user has 
> to set the timeout duration everytime the function is called, otherwise there 
> will not be any timeout set.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to