[
https://issues.apache.org/jira/browse/SPARK-19838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tathagata Das resolved SPARK-19838.
-----------------------------------
Resolution: Fixed
Fix Version/s: 2.2.0
https://github.com/apache/spark/pull/17179
> Adding Processing Time based timeout
> ------------------------------------
>
> Key: SPARK-19838
> URL: https://issues.apache.org/jira/browse/SPARK-19838
> Project: Spark
> Issue Type: Sub-task
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Reporter: Tathagata Das
> Assignee: Tathagata Das
> Fix For: 2.2.0
>
>
> When a key does not get any new data in mapGroupsWithState, the mapping
> function is never called on it. So we need a timeout feature that calls the
> function again in such cases, so that the user can decide whether to continue
> waiting or clean up (remove state, save stuff externally, etc.).
> Timeouts can be either based on processing time or event time. This JIRA is
> for processing time, but defines the high level API design for both. The
> usage would look like this
> {code}
> def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = {
> ...
> state.setTimeoutDuration(10000)
> ...
> }
> dataset // type is Dataset[T]
> .groupByKey[K](keyingFunc) // generates KeyValueGroupedDataset[K, T]
> .mapGroupsWithState[S, U](
> func = stateFunction,
> timeout = KeyedStateTimeout.withProcessingTime) // returns Dataset[U]
> {code}
> Note the following design aspects.
> - The timeout type is provided as a param in mapGroupsWithState as a
> parameter global to all the keys. This is so that the planner knows this at
> planning time, and accordingly optimize the execution based on whether to
> saves extra info in state or not (e.g. timeout durations or timestamps).
> - The exact timeout duration is provided inside the function call so that it
> can be customized on a per key basis.
> - When the timeout occurs for a key, the function is called with no values,
> and {{KeyedState.isTimingOut()}} set to {{true}}.
> - The timeout is reset for key every time the function is called on the key,
> that is, when the key has new data, or the key has timed out. So the user has
> to set the timeout duration everytime the function is called, otherwise there
> will not be any timeout set.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]