[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645370#comment-16645370
 ] 

Shixiong Zhu commented on SPARK-10816:
--------------------------------------

Thanks a lot for the design docs and prototypes. I had a long discussion with 
[~tdas] and we think we should discuss other alternative approaches in the 
design doc.

We came out 3 possible implementations:

[1] Put <key, a list of events> into a state store. In each batch, for each 
key, scan the sorted event lists and use the watermark to find out the 
finalized session and output them.

[2] Put <(key, timestamp), event> into a state store. Here is the key used in 
the state store is a tuple of user key and the event timestamp. In each batch, 
sort each partition using (key, timestamp) and scan the whole sorted partition 
to find out the finalized sessions and output them.

[3] Use two state stores like what stream-stream join does. The first state 
store will store <key, number of events>, the second one will store <key + 
index, event>. When we insert an event into the second state store, we should 
use insertion sort to make sure we store events order by timestamp, such as 
find the proper index for this event, and update the following indices after 
this event. Then we can just scan all keys and their values in the state store 
to find out the finalized session and output them.

[1] is easy to implement and can be done directly using 
`flatMapGroupsWithState` but it may fail when a key has too many events. [2] 
and [3] will scale well but the performance may be worse.

If I read the codes correctly, [https://github.com/apache/spark/pull/22583] is 
[1]. [https://github.com/apache/spark/pull/22482] is a combination of [2] and 
[3] but still need to load all values of a key into the memory at the same time.

[~kabhwan] [~XuanYuan] could you work together to update your design docs to 
add these alternative approaches and discuss pros and cons? It would be great 
you can put the design docs to a google doc so that it's easy to leave comments.

In addition, it's better to also discuss the compatibility, such as if we 
decide to use a new approach to implement session window but need to change the 
state format in the state store, do we have enough version information to 
identity the old and new formats?

> EventTime based sessionization
> ------------------------------
>
>                 Key: SPARK-10816
>                 URL: https://issues.apache.org/jira/browse/SPARK-10816
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Reynold Xin
>            Priority: Major
>         Attachments: SPARK-10816 Support session window natively.pdf, Session 
> Window Support For Structure Streaming.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to