On 6/13/19 6:10 PM, Robert Bradshaw wrote:
On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
On 6/13/19 4:31 PM, Robert Bradshaw wrote:
The comment fails to take into account the asymmetry between
calling addInput vs. mergeAccumulators. It also focuses a lot on
the asymptotic behavior, when the most common behavior is likely
having a single (global) window.
Yes, occurred to me too. There are more questions here:
a) it would help if WindowedValue#explodeWindows would return
List instead of Iterable, because then optimizations would be
possible based on number of windows (e.g. when there is only
single window, there is no need to sort anything). This should be
simple change, as it already is a List.
Well, I'm wary of this change, but we could always create a list out
of it (via cast or ImmutableList.copyOf) if we needed.
Why so? I thought this would be the least objectionable change, because
it actually *is* a List, and there is no interface, it is just a public
method, that needs to be changed and state the fact correctly. A
Collection would be the same. Iterable is for cases, where you don't
exactly know if the data is stored in memory, or loaded from somewhere
else and whence the size cannot be determined in advance. This is not
the case for WindowFn.
b) I'm a little confused why it is better to keep key with all
its windows in single WindowedValue in general. My first thoughts
would really be - explode all windows to (key, window) pairs, and
use these as keys as much as you can - there is obvious small
drawback, this might be less efficient for windowing strategies
with high number of windows per element (sliding windows with
small slide compared to window length). But that could be added to
the WindowFn and decision could be made accordingly.
The optimization is for sliding windows, where it's more efficient to
send the value with all its windows and explode after the shuffle
rather than duplicate the value before. Of course this breaks that
ability with the hopes of being able to reduce the size more by doing
combining. (A half-way approach would be to group on unexploded window
set, which sounds worse generically but isn't bad in practice.)
I'd say, that adding something like WindowFn.isAssigningToSingleWindow()
would solve all the nuances.
Were I to implement this I would let the accumulator be a hashmap
Window -> Value/Timestamp. For the non-merging, when a
WindowedValue with N windows comes in, simply do O(N)
lookup+addInput calls. Merging these accumulator hashmaps is
pretty easy. For merging windows, I would first invoke the
WindowFn to determine which old windows + new windows merged into
bigger windows, and then construct the values of the bigger
windows with the appropriate createAccumulator, addInput, and/or
mergeAccumulators calls, depending on how many old vs. new values
there are. This is a merging call + O(N) additions.
Yep, that sounds like it could work. For single window (global,
tumbling) the approach above would still be probably more efficient.
Yes, for the global window one could do away with the (single-valued)
hashmap. Sessions.mergeWindows() does the tumbling to figure out which
windows to merge, so that'd be just as efficient.
+1
BTW, the code is broken because it hard codes SessionWindows
rather than calling WindowFn.mergeWindows(...). This is a
correctness, not just a performance, bug :(.
Can you point me out in the code?
E.g.
https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106 always
merges things that are intersecting, rather than
querying WindowFn.mergeWindows to determine which, if any, should be
merged.
On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Robert,
there is a comment around that which states, that the current
solution should be more efficient. I'd say, that (for
non-merging windows) it would be best to first explode
windows, and only after that do combineByKey(key & window).
Merging windows would have to be handled the way it is, or
maybe it would be better to split this to
1) assign windows to elements
2) combineByKeyAndWindow
Jan
On 6/13/19 3:51 PM, Robert Bradshaw wrote:
I think the problem is that it never leverages the
(traditionally much cheaper)
CombineFn.addInput(old_accumulator, new_value). Instead, it
always calls CombineFn.mergeAccumulators(old_accumulator,
CombineFn.addInput(CombineFn.createAccumulator(),
new_value)). It should be feasible to fix this while still
handling windowing correctly. (The end-of-window timestamp
combiner could also be optimized because the timestamp need
not be tracked throughout in that case.)
On the other hand, once we move everything to portability,
it's we'll probably toss all this code that use Spark's
combiner lifting (instead using the GroupingCombiningTable
that's implemented naively in Beam, as we do for Python to
avoid fusion breaks).
On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi,
I have hit a performance issue with Spark runner, that
seems to related
to its current Combine.perKey implementation. I'll try
to summarize what
I have found in the code:
- Combine.perKey uses Spark's combineByKey primitive,
which is pretty
similar to the definition of CombineFn
- it holds all elements as WindowedValues, and uses
Iterable<WindowedValue<Acc>> as accumulator (each
WindowedValue holds
accumulated state for each window)
- the update function is implemented as
1) convert value to Iterable<WindowedValue<Acc>>
2) merge accumulators for each windows
The logic inside createAccumulator and mergeAccumulators
is quite
non-trivial. The result of profiling is that two frames
where the code
spends most of the time are:
41633930798 33.18% 4163
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
19990682441 15.93% 1999
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
A simple change on code from
PCollection<..> input = ...
input.apply(Combine.perKey(...))
to
PCollection<..> input = ...
input
.apply(GroupByKey.create())
.apply(Combine.groupedValues(...))
had drastical impact on the job run time (minutes as
opposed to hours,
after which the first job didn't even finish!).
I think I understand the reason why the current logic is
implemented as
it is, it has to deal with merging windows. But the
consequences seem to
be that it renders the implementation very inefficient.
Has anyone seen similar behavior? Does my analysis of
the problem seem
correct?
Jan