[
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658599#comment-16658599
]
Jungtaek Lim edited comment on SPARK-10816 at 10/22/18 6:04 AM:
----------------------------------------------------------------
Just going back to review the origin comment of [~zsxwing]. I guess it is a
state-oriented view and various approaches are still available on how to
leverage the format of state.
(For example, one big physical exec vs smaller physical execs with leveraging
existing physical exec)
For now let's concentrate on pros and cons of state format.
[1] is pretty clear that it works, and very simple to implement based on this.
But it is also pretty clear it requires loading all the sessions for given key
in memory, as well as requires shifting left and right on array. It may require
search whether the session is updated or new. It also always overwrites the
value of given key, hence all the sessions for given key will be written to
delta once there's any change on the key.
[2] would help to overwrite updated sessions if we assume start timestamp of
session is unchanged, which would require some overheads on [1] and [3],
seeking and writing. Unfortunately the assumption is not correct: start
timestamp of session can be modified as well via late event, so we need to
handle such corner case, which means we still need to traverse sessions in
given key which would be hugely inefficient. If key is not sorted, it requires
full traverse of key space.
[3] is the thing which let us play with trade-off. How it works is basically
similar with [1], since it simulates indexable array. It has some more
overheads to manipulate two states, but it doesn't require loading all of
sessions in given key to memory. Moreover, it is tweaked to reduce delta on
removal of elements, which has possible downside - broken of order - which we
should play with trade-off.
(One idea of addressing both order of elements and huge delta on shifting is
placing tombstones instead of removing elements, and periodically removing
tombstones and shifting - maybe during taking snapshot if possible?)
While I said "some overheads" to manipulate two states, it is going to be
considerable when number of elements in given key are going to be huge. The
thing is, the key part is always duplicated for each value, hence taking
redundant memory as well as delta. [1] doesn't have this issue.
I guess it is unlikely to have many of valid sessions in given key at the
specific time which would make [1] and [3] to both candidates and [1] as
(maybe) optimal, but it's just me, so [3] may be preferred way to go. My patch
leverages [3] but due to out of order after removal on [3], memory issue still
exists as same as [1], which I'll try to address without introducing too huge
overhead.
Please let me know above analysis would be good to be included in SPIP or
detailed design doc.
Btw, we may also would like to talk about how to add complexity: one big
physical exec vs smaller physical execs with leveraging existing physical
execs. Both mine and Baidu's patches are taking latter approach, but I can try
to make changes if Spark community prefers former to achieve high cohesion to
specific exec instead of injecting new execs into multiple places.
was (Author: kabhwan):
Just going back to review the origin comment of [~zsxwing]. I guess it is a
state-oriented view and various approaches are still available on how to
leverage the format of state.
(For example, one big physical exec vs smaller physical execs with leveraging
existing physical exec)
For now let's concentrate on pros and cons of state format.
[1] is pretty clear that it works, and very simple to implement based on this.
But it is also pretty clear it requires loading all the sessions for given key
in memory, as well as requires shifting left and right on array. It may require
search whether the session is updated or new. It also always overwrites the
value of given key, hence all the sessions for given key will be written to
delta once there's any change on the key.
[2] would help to overwrite updated sessions if we assume start timestamp of
session is unchanged, which would require some overheads on [1] and [3],
seeking and writing. Unfortunately the assumption is not correct: start
timestamp of session can be modified as well via late event, so we need to
handle such corner case, which means we still need to traverse sessions in
given key which would be hugely inefficient. If key is not sorted, it requires
full traverse of key space.
[3] is the thing which let us play with trade-off. How it works is basically
similar with [1], since it simulates indexable array. It has some more
overheads to manipulate two states, but it doesn't require loading all of
sessions in given key to memory. Moreover, it is tweaked to reduce delta on
removal of elements, which has possible downside - broken of order - which we
should play with trade-off.
(One idea of addressing both order of elements and huge delta on shifting is
placing tombstones instead of removing elements, and periodically removing
tombstones and shifting - maybe during taking snapshot if possible?)
I guess it is unlikely to have plenty of valid sessions in given key at the
specific time, but it's just me, so [3] may be preferred way to go. My patch
leverages [3] but due to out of order after removal on [3], memory issue still
exists as same as [1], which I'll try to address without introducing too huge
overhead.
Please let me know above analysis would be good to be included in SPIP or
detailed design doc.
Btw, we may also would like to talk about how to add complexity: one big
physical exec vs smaller physical execs with leveraging existing physical
execs. Both mine and Baidu's patches are taking latter approach, but I can try
to make changes if Spark community prefers former to achieve high cohesion to
specific exec instead of injecting new execs into multiple places.
> EventTime based sessionization
> ------------------------------
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
> Issue Type: New Feature
> Components: Structured Streaming
> Reporter: Reynold Xin
> Priority: Major
> Attachments: SPARK-10816 Support session window natively.pdf, Session
> Window Support For Structure Streaming.pdf
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]