On 6/13/19 4:31 PM, Robert Bradshaw wrote:
The comment fails to take into account the asymmetry between calling
addInput vs. mergeAccumulators. It also focuses a lot on the
asymptotic behavior, when the most common behavior is likely having a
single (global) window.
Yes, occurred to me too. There are more questions here:
a) it would help if WindowedValue#explodeWindows would return List
instead of Iterable, because then optimizations would be possible based
on number of windows (e.g. when there is only single window, there is no
need to sort anything). This should be simple change, as it already is a
List.
b) I'm a little confused why it is better to keep key with all its
windows in single WindowedValue in general. My first thoughts would
really be - explode all windows to (key, window) pairs, and use these as
keys as much as you can - there is obvious small drawback, this might be
less efficient for windowing strategies with high number of windows per
element (sliding windows with small slide compared to window length).
But that could be added to the WindowFn and decision could be made
accordingly.
Were I to implement this I would let the accumulator be a hashmap
Window -> Value/Timestamp. For the non-merging, when a WindowedValue
with N windows comes in, simply do O(N) lookup+addInput calls.
Merging these accumulator hashmaps is pretty easy. For merging
windows, I would first invoke the WindowFn to determine which old
windows + new windows merged into bigger windows, and then construct
the values of the bigger windows with the appropriate
createAccumulator, addInput, and/or mergeAccumulators calls, depending
on how many old vs. new values there are. This is a merging call +
O(N) additions.
Yep, that sounds like it could work. For single window (global,
tumbling) the approach above would still be probably more efficient.
BTW, the code is broken because it hard codes SessionWindows rather
than calling WindowFn.mergeWindows(...). This is a correctness, not
just a performance, bug :(.
Can you point me out in the code?
On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Robert,
there is a comment around that which states, that the current
solution should be more efficient. I'd say, that (for non-merging
windows) it would be best to first explode windows, and only after
that do combineByKey(key & window). Merging windows would have to
be handled the way it is, or maybe it would be better to split this to
1) assign windows to elements
2) combineByKeyAndWindow
Jan
On 6/13/19 3:51 PM, Robert Bradshaw wrote:
I think the problem is that it never leverages the (traditionally
much cheaper) CombineFn.addInput(old_accumulator, new_value).
Instead, it always calls
CombineFn.mergeAccumulators(old_accumulator,
CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It
should be feasible to fix this while still handling windowing
correctly. (The end-of-window timestamp combiner could also be
optimized because the timestamp need not be tracked throughout in
that case.)
On the other hand, once we move everything to portability, it's
we'll probably toss all this code that use Spark's combiner
lifting (instead using the GroupingCombiningTable that's
implemented naively in Beam, as we do for Python to avoid fusion
breaks).
On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
I have hit a performance issue with Spark runner, that seems
to related
to its current Combine.perKey implementation. I'll try to
summarize what
I have found in the code:
- Combine.perKey uses Spark's combineByKey primitive, which
is pretty
similar to the definition of CombineFn
- it holds all elements as WindowedValues, and uses
Iterable<WindowedValue<Acc>> as accumulator (each
WindowedValue holds
accumulated state for each window)
- the update function is implemented as
1) convert value to Iterable<WindowedValue<Acc>>
2) merge accumulators for each windows
The logic inside createAccumulator and mergeAccumulators is
quite
non-trivial. The result of profiling is that two frames where
the code
spends most of the time are:
41633930798 33.18% 4163
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
19990682441 15.93% 1999
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
A simple change on code from
PCollection<..> input = ...
input.apply(Combine.perKey(...))
to
PCollection<..> input = ...
input
.apply(GroupByKey.create())
.apply(Combine.groupedValues(...))
had drastical impact on the job run time (minutes as opposed
to hours,
after which the first job didn't even finish!).
I think I understand the reason why the current logic is
implemented as
it is, it has to deal with merging windows. But the
consequences seem to
be that it renders the implementation very inefficient.
Has anyone seen similar behavior? Does my analysis of the
problem seem
correct?
Jan