[
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15963353#comment-15963353
]
Michael Armbrust commented on SPARK-19067:
------------------------------------------
No, this will be available in Spark 2.2.0
> mapGroupsWithState - arbitrary stateful operations with Structured Streaming
> (similar to DStream.mapWithState)
> --------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
> Issue Type: New Feature
> Components: Structured Streaming
> Reporter: Michael Armbrust
> Assignee: Tathagata Das
> Priority: Critical
> Fix For: 2.2.0
>
>
> Right now the only way to do stateful operations with with Aggregator or
> UDAF. However, this does not give users control of emission or expiration of
> state making it hard to implement things like sessionization. We should add
> a more general construct (probably similar to {{DStream.mapWithState}}) to
> structured streaming. Here is the design.
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> // ------------ New methods on KeyValueGroupedDataset ------------
> class KeyValueGroupedDataset[K, V] {
> // Scala friendly
> def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V],
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K,
> Iterator[V], State[S]) => Iterator[U])
> // Java friendly
> def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S,
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
> def flatMapGroupsWithState[S, U](func:
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S],
> resultEncoder: Encoder[U])
> }
> // ------------------- New Java-friendly function classes -------------------
> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
> R call(K key, Iterator<V> values, state: State<S>) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends
> Serializable {
> Iterator<R> call(K key, Iterator<V> values, state: GroupState<S>) throws
> Exception;
> }
> // ---------------------- Wrapper class for state data ----------------------
> trait GroupState[S] {
> def exists(): Boolean
> def get(): S // throws Exception is state does not
> exist
> def getOption(): Option[S]
> def update(newState: S): Unit
> def remove(): Unit // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will
> return true, and getOption will return Some(...).
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount:
> GroupState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
> (word, newCount)
> }
> dataset // type
> is Dataset[String]
> .groupByKey[String](w => w) // generates
> KeyValueGroupedDataset[String, String]
> .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while) -
> Done
> - General expression based expiration - TODO. Any real usecases that cannot
> be done with timeouts?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]