[
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958403#comment-15958403
]
Amit Sela commented on SPARK-19067:
-----------------------------------
[~tdas] should I open a ticket, for future improvements, about allowing
timeouts to execute even if there's no data in the pipeline ?
As I mentioned in the PR comments:
Using the {{EventTime}} timeout in the future, I assume the "clock" would be
watermark based instead of wall-time, and I see at least two use-cases where
this would matter:
# Testing - being able to move the clock forward to end-of-time to force firing
everything that still awaits for the closing of windows.
# A pipeline where there is a filter before the stateful op. such that there is
data, and the watermark advances, but some of the events are dropped and don't
reach the stateful operator so it will hold off firing until the "proper" data
(that passes filter) comes along - this again could cause an unknown delay to
emitting results out of the stateful operator.
> mapGroupsWithState - arbitrary stateful operations with Structured Streaming
> (similar to DStream.mapWithState)
> --------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
> Issue Type: New Feature
> Components: Structured Streaming
> Reporter: Michael Armbrust
> Assignee: Tathagata Das
> Priority: Critical
> Fix For: 2.2.0
>
>
> Right now the only way to do stateful operations with with Aggregator or
> UDAF. However, this does not give users control of emission or expiration of
> state making it hard to implement things like sessionization. We should add
> a more general construct (probably similar to {{DStream.mapWithState}}) to
> structured streaming. Here is the design.
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> // ------------ New methods on KeyValueGroupedDataset ------------
> class KeyValueGroupedDataset[K, V] {
> // Scala friendly
> def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V],
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K,
> Iterator[V], State[S]) => Iterator[U])
> // Java friendly
> def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S,
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
> def flatMapGroupsWithState[S, U](func:
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S],
> resultEncoder: Encoder[U])
> }
> // ------------------- New Java-friendly function classes -------------------
> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
> R call(K key, Iterator<V> values, state: State<S>) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends
> Serializable {
> Iterator<R> call(K key, Iterator<V> values, state: GroupState<S>) throws
> Exception;
> }
> // ---------------------- Wrapper class for state data ----------------------
> trait GroupState[S] {
> def exists(): Boolean
> def get(): S // throws Exception is state does not
> exist
> def getOption(): Option[S]
> def update(newState: S): Unit
> def remove(): Unit // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will
> return true, and getOption will return Some(...).
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount:
> GroupState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
> (word, newCount)
> }
> dataset // type
> is Dataset[String]
> .groupByKey[String](w => w) // generates
> KeyValueGroupedDataset[String, String]
> .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while) -
> Done
> - General expression based expiration - TODO. Any real usecases that cannot
> be done with timeouts?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]