[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896866#comment-15896866
 ] 

Amit Sela commented on SPARK-19067:
-----------------------------------

It depends, will those timers be "resetable" ? So that once I visit the state 
on timeout I can re-set a new timeout ? if so, that could work.
You can get more insight of what I'm talking about in my implementation of 
Triggers in Apache Beam: 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L92
I used {{updateStateByKey}} because I need to visit the state even in case 
there are no updates - for example, if the Watermark passed the end-of-window 
and it is time to fire a result (based on event-time). If the new 
{{mapWithState}} would support resetable timers, I can keep the "next time/s to 
fire" in the state and set the timer to the closest time so it would initiate 
the visit instead of re-visiting all state as I do today.
Does this make sense ?

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19067
>                 URL: https://issues.apache.org/jira/browse/SPARK-19067
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Michael Armbrust
>            Assignee: Tathagata Das
>            Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> // ------------ New methods on KeyValueGroupedDataset ------------
> class KeyValueGroupedDataset[K, V] {  
>       // Scala friendly
>       def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
>         def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>       // Java friendly
>        def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>        def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // ------------------- New Java-friendly function classes ------------------- 
> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
>   R call(K key, Iterator<V> values, state: State<S>) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends 
> Serializable {
>   Iterator<R> call(K key, Iterator<V> values, state: State<S>) throws 
> Exception;
> }
> // ---------------------- Wrapper class for state data ---------------------- 
> trait KeyedState[S] {
>       def exists(): Boolean   
>       def get(): S                    // throws Exception is state does not 
> exist
>       def getOption(): Option[S]       
>       def update(newState: S): Unit
>       def remove(): Unit              // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> KeyedState[Long]) => {
>     val newCount = words.size + runningCount.getOption.getOrElse(0L)
>     runningCount.update(newCount)
>    (word, newCount)
> }
> dataset                                                               // type 
> is Dataset[String]
>   .groupByKey[String](w => w)                         // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)        // returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while)
> - General expression based expiration 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to