[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623659#comment-16623659 ]
Mike Sukmanowsky commented on SPARK-10816: ------------------------------------------ In case it helps to have a more concrete example, I document a use case in this StackOverflow issue [https://stackoverflow.com/questions/51810460/is-proper-event-time-sessionization-possible-with-spark-structured-streaming] which explains what we're hoping to do from a streaming standpoint (thanks to [~arunmahadevan] for chiming in with help on that). In a batch context, the gist of what we're trying to do is pretty straightforward and, I'd imagine, inline with what a lot of Spark users are looking to do. A session for my purposes is a group of uninterrupted activity for a user such that no two chronologically ordered (by event time, not processing time) events are separated by more than some developer-defined duration (30 minutes is common). Given records like so in an events table: {code} {"event_time": "2018-01-01T00:00:00", "user_id": "mike"} {"event_time": "2018-01-01T00:01:00", "user_id": "mike"} {"event_time": "2018-01-01T00:05:00", "user_id": "mike"} {"event_time": "2018-01-01T00:45:00", "user_id": "mike"}{code} We can write a batch query to add a session_id column (note this is Postgres SQL, not Hive/Spark SQL): {code:sql} SELECT user_id, event_time, SUM(is_new_session) OVER (ORDER BY user_id, event_time) AS global_session_id, SUM(is_new_session) OVER (PARTITION BY user_id ORDER BY event_time) AS user_session_id FROM ( SELECT *, CASE WHEN EXTRACT('EPOCH' FROM event_time) - EXTRACT('EPOCH' FROM last_event) >= (30 * 10) OR last_event IS NULL THEN 1 ELSE 0 END AS is_new_session FROM ( SELECT user_id, event_time, LAG(event_time,1) OVER (PARTITION BY user_id ORDER BY event_time) AS last_event FROM events ) last ) final {code} Translating my exact session definition to flatMapGroupsWithState to work in a streaming context with the proper watermark configuration is something I haven't been able to figure out just yet. It'd be great to have some official Spark examples of how to handle that use case even if it's via flatMapGroupsWithState. Even better would be some kind of simplified API that makes this all accessible to R and Python users as [~kabhwan] suggests. > EventTime based sessionization > ------------------------------ > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Reporter: Reynold Xin > Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org