[
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645705#comment-16645705
]
Jungtaek Lim edited comment on SPARK-10816 at 10/11/18 1:21 AM:
----------------------------------------------------------------
[~zsxwing] [~tdas]
Thanks for spending time to read my patch as well as proposal. In case of
missing detailed design doc, you can refer below detailed design doc to
understand my patch easier.
[https://docs.google.com/document/d/1tUO29BDXb9127RiivUS7Hv324dC0YHuokYvyQRpurDY/edit?usp=sharing]
The overall concept of approach is sorting the input rows (once in local
partition, another once after shuffle), and keep the data sorted while applying
operations, hence applying merge sort when merging input rows with state
(previous sessions), and applying sort based aggregation directly when merging
sessions (it doesn't even leverage hash based aggregation... it just applies
aggregation directly when merging session).
This approach enjoys the fact that data is sorted which is optimal state of
merging session. Since this targets simple session window (simple gap based),
there's no retraction of events from already assigned window (if I don't miss
anything), and we can safely apply aggregation on merging session instead of
keeping all the events in session.
For specific to state store, my patch leverages [3] but it is not tightly
integrated to session window, hence a bit suboptimal. I also described some
possible approaches in the doc to make it better.
{quote}[https://github.com/apache/spark/pull/22482] is a combination of [2] and
[3] but still need to load all values of a key into the memory at the same time.
{quote}
I'm not sure what I understand correctly, but my patch loads all "previous
sessions" of a key, not all "events" of a key, hence the chance of values
getting bigger is pretty small except rarely edge case: super small session gap
with super long watermark delay. In normal case I wouldn't expect the number of
previous sessions of a key is like some 100s, even some 10s. In addition, each
session contains "aggregated" columns, not all "events" in session.
Unfortunately sorting previous sessions of a key is from the optimization on
symmetric stream join state manager: it allows unstable order after removing
events to avoid shifting all events (which would make unnecessary "delta"), so
while we put the sessions to be a correct order, we should sort anyway when we
get sessions from state manager. It is a "trade-off", and I thought it is OK to
leave since the elements to sort should be trivial.
If we would be better to have combined SPIP doc (include detailed design doc in
SPIP doc) please let me know, I'll add it.
was (Author: kabhwan):
[~zsxwing] [~tdas]
Thanks for spending time to read my patch as well as proposal. In case of
missing detailed design doc, you can refer below detailed design doc to
understand my patch easier.
[https://docs.google.com/document/d/1tUO29BDXb9127RiivUS7Hv324dC0YHuokYvyQRpurDY/edit?usp=sharing]
The overall concept of approach is sorting the input rows (once in local
partition, another once after shuffle), and keep the data sorted while applying
operations, hence applying merge sort when merging input rows with state
(previous sessions), and applying sort based aggregation directly when merging
sessions (it doesn't even leverage hash based aggregation... it just applies
aggregation directly when merging session).
This approach enjoys the fact that data is sorted which is optimal state of
merging session. Since this targets simple session window (simple gap based),
there's no retraction of events from already assigned window (if I don't miss
anything), and we can safely apply aggregation on merging session instead of
keeping all the events in session.
For specific to state store, my patch leverages [3] but it is not tightly
integrated to session window, hence a bit suboptimal. I also described some
possible approaches in the doc to make it better.
{quote}[https://github.com/apache/spark/pull/22482] is a combination of [2] and
[3] but still need to load all values of a key into the memory at the same time.
{quote}
I'm not sure what I understand correctly, but my patch loads all "previous
sessions" of a key, not all "events" of a key, hence the chance of values
getting bigger is pretty small except rarely edge case: super small session gap
with super long watermark delay. In normal case I wouldn't expect the number of
previous sessions of a key is like some 100s, even some 10s. In addition, each
session contains "aggregated" columns, not all "events" in session.
Unfortunately sorting previous sessions of a key is from the optimization on
symmetric stream join state manager: it allows unstable order after removing
events to avoid shifting all events, so while we put the sessions to be a
correct order, we should sort anyway when we get sessions from state manager.
It is a "trade-off", and I thought it is OK to leave since the elements to sort
should be trivial.
If we would be better to have combined SPIP doc (include detailed design doc in
SPIP doc) please let me know, I'll add it.
> EventTime based sessionization
> ------------------------------
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
> Issue Type: New Feature
> Components: Structured Streaming
> Reporter: Reynold Xin
> Priority: Major
> Attachments: SPARK-10816 Support session window natively.pdf, Session
> Window Support For Structure Streaming.pdf
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]