[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623659#comment-16623659
 ] 

Mike Sukmanowsky commented on SPARK-10816:
------------------------------------------

In case it helps to have a more concrete example, I document a use case in this 
StackOverflow issue 
[https://stackoverflow.com/questions/51810460/is-proper-event-time-sessionization-possible-with-spark-structured-streaming]
 which explains what we're hoping to do from a streaming standpoint (thanks to 
[~arunmahadevan] for chiming in with help on that).

In a batch context, the gist of what we're trying to do is pretty 
straightforward and, I'd imagine, inline with what a lot of Spark users are 
looking to do.

A session for my purposes is a group of uninterrupted activity for a user such 
that no two chronologically ordered (by event time, not processing time) events 
are separated by more than some developer-defined duration (30 minutes is 
common). Given records like so in an events table: 
{code}
{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}{code}
 
 We can write a batch query to add a session_id column (note this is Postgres 
SQL, not Hive/Spark SQL):
{code:sql}
SELECT 
   user_id,
   event_time,
   SUM(is_new_session) OVER (ORDER BY user_id, event_time) AS global_session_id,
   SUM(is_new_session) OVER (PARTITION BY user_id ORDER BY event_time) AS 
user_session_id
  FROM (
    SELECT *,
          CASE WHEN EXTRACT('EPOCH' FROM event_time) - EXTRACT('EPOCH' FROM 
last_event) >= (30 * 10) 
                 OR last_event IS NULL 
               THEN 1 ELSE 0 END AS is_new_session
     FROM (
          SELECT user_id,
                 event_time,
                 LAG(event_time,1) OVER (PARTITION BY user_id ORDER BY 
event_time) AS last_event
            FROM events
          ) last
   ) final
{code}
Translating my exact session definition to flatMapGroupsWithState to work in a 
streaming context with the proper watermark configuration is something I 
haven't been able to figure out just yet.

It'd be great to have some official Spark examples of how to handle that use 
case even if it's via flatMapGroupsWithState.

Even better would be some kind of simplified API that makes this all accessible 
to R and Python users as [~kabhwan] suggests.

> EventTime based sessionization
> ------------------------------
>
>                 Key: SPARK-10816
>                 URL: https://issues.apache.org/jira/browse/SPARK-10816
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Reynold Xin
>            Priority: Major
>         Attachments: SPARK-10816 Support session window natively.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to