> It depends on the typical number of elements per window. E.g. consider sliding windows with a size of one hour and a periodicity of one minute. If we have one datum per hour, better to not explode. If we have one datum per minute, it's a wash. If we have one datum per second, much better to explode.

I'm not sure if I follow. I'd say it works the opposite. If there is one record per second, then exploding windows before shuffle would mean we have to shuffle 60x more data. That becomes critical as more and more data is added, so exploding a dataset with 1000s of records per second will become really bad.

Either way, these are runtime conditions that are not known in advance, so it cannot be used in pipeline translation.

I have created JIRA [1] to track that and will try some experiments.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7574

On 6/14/19 1:42 PM, Robert Bradshaw wrote:
On Fri, Jun 14, 2019 at 1:02 PM Jan Lukavský <[email protected]> wrote:
  > Interesting. However, there we should never need to sort the windows
of the input, only the set of live windows (of which there may be any
number regardless of whether WindowFn does singleton assignments, and
only then in the merging case).

Ack, however the current implementation creates accumulator from input
and therefore sorts windows for all elements. Moreover, it doesn't
distinguish between merging and non-merging windows.
Yes, the create accumulator from input (and then merging the created
accumulators) is where the insane overheads are coming from. We should
not use that pattern.

  > This seems like a merging vs. non-merging choice, not a
single-vs-multiple window choice.

I'd say the condition should be non-merging && not many windows per
element, because otherwise it makes sense to group the windows although
it is non-merging windowing (sliding with small slide step). Otherwise
we would explode the data too much.
It depends on the typical number of elements per window. E.g. consider
sliding windows with a size of one hour and a periodicity of one
minute. If we have one datum per hour, better to not explode. If we
have one datum per minute, it's a wash. If we have one datum per
second, much better to explode.

On 6/14/19 12:19 PM, Robert Bradshaw wrote:
On Fri, Jun 14, 2019 at 12:10 PM Jan Lukavský <[email protected]> wrote:
Hi Robert,

thanks for the discussion. I will create a JIRA with summary of this.
Some comments inline.

Jan

On 6/14/19 10:49 AM, Robert Bradshaw wrote:
On Thu, Jun 13, 2019 at 8:43 PM Jan Lukavský <[email protected]> wrote:
On 6/13/19 6:10 PM, Robert Bradshaw wrote:

On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected]> wrote:
On 6/13/19 4:31 PM, Robert Bradshaw wrote:

The comment fails to take into account the asymmetry between calling addInput 
vs. mergeAccumulators. It also focuses a lot on the asymptotic behavior, when 
the most common behavior is likely having a single (global) window.

Yes, occurred to me too. There are more questions here:

    a) it would help if WindowedValue#explodeWindows would return List instead 
of Iterable, because then optimizations would be possible based on number of 
windows (e.g. when there is only single window, there is no need to sort 
anything). This should be simple change, as it already is a List.
Well, I'm wary of this change, but we could always create a list out of it (via 
cast or ImmutableList.copyOf) if we needed.

Why so? I thought this would be the least objectionable change, because it 
actually *is* a List, and there is no interface, it is just a public method, 
that needs to be changed and state the fact correctly. A Collection would be 
the same. Iterable is for cases, where you don't exactly know if the data is 
stored in memory, or loaded from somewhere else and whence the size cannot be 
determined in advance. This is not the case for WindowFn.
It constrains us in that if WindowFn returns an iterable and we want
to store it as such we can no longer do so. I'm also not seeing what
optimization there is here--the thing we'd want to sort is the set of
existing windows (plus perhaps this one). Even if we wanted to do the
sort here, sort of a 1-item list should be insanely cheap.
Agree, that this should be probably suffice to do on the WindowFn. Then
there is no need to retrieve the number of windows for element, because
what actually matters most is whether the count is == 1 or > 1. The
WindowFn can give such information. Sorting on 1-item list on the other
hand is not that cheap as it might look. It invokes TimSort and does
sone calculations that appeared on my CPU profile quite often.
Interesting. However, there we should never need to sort the windows
of the input, only the set of live windows (of which there may be any
number regardless of whether WindowFn does singleton assignments, and
only then in the merging case).

    b) I'm a little confused why it is better to keep key with all its windows 
in single WindowedValue in general. My first thoughts would really be - explode 
all windows to (key, window) pairs, and use these as keys as much as you can - 
there is obvious small drawback, this might be less efficient for windowing 
strategies with high number of windows per element (sliding windows with small 
slide compared to window length). But that could be added to the WindowFn and 
decision could be made accordingly.
The optimization is for sliding windows, where it's more efficient to send the 
value with all its windows and explode after the shuffle rather than duplicate 
the value before. Of course this breaks that ability with the hopes of being 
able to reduce the size more by doing combining. (A half-way approach would be 
to group on unexploded window set, which sounds worse generically but isn't bad 
in practice.)

I'd say, that adding something like WindowFn.isAssigningToSingleWindow() would 
solve all the nuances.
If it helped. As the interface returns a list, I don't see much we can
skip in this case.
The information can be used to add the (single) window label into the
grouping key and skip all the other stuff.
This seems like a merging vs. non-merging choice, not a
single-vs-multiple window choice.

Were I to implement this I would let the accumulator be a hashmap Window -> 
Value/Timestamp. For the non-merging, when a WindowedValue with N windows comes 
in, simply do O(N) lookup+addInput calls. Merging these accumulator hashmaps is 
pretty easy. For merging windows, I would first invoke the WindowFn to determine 
which old windows + new windows merged into bigger windows, and then construct the 
values of the bigger windows with the appropriate createAccumulator, addInput, 
and/or mergeAccumulators calls, depending on how many old vs. new values there 
are. This is a merging call + O(N) additions.

Yep, that sounds like it could work. For single window (global, tumbling) the 
approach above would still be probably more efficient.
Yes, for the global window one could do away with the (single-valued) hashmap. 
Sessions.mergeWindows() does the tumbling to figure out which windows to merge, 
so that'd be just as efficient.

+1
Granted we should keep in mind that all of these further optimizations
probably pale in comparison to just getting rid of using
mergeAccumulators where we should be using addInput. And this is, in
the long run, dead code.


BTW, the code is broken because it hard codes SessionWindows rather than 
calling WindowFn.mergeWindows(...). This is a correctness, not just a 
performance, bug :(.

Can you point me out in the code?
E.g. 
https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106
 always merges things that are intersecting, rather than querying 
WindowFn.mergeWindows to determine which, if any, should be merged.

On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]> wrote:
Hi Robert,

there is a comment around that which states, that the current solution should be 
more efficient. I'd say, that (for non-merging windows) it would be best to first 
explode windows, and only after that do combineByKey(key & window). Merging 
windows would have to be handled the way it is, or maybe it would be better to 
split this to

    1) assign windows to elements

    2) combineByKeyAndWindow

Jan

On 6/13/19 3:51 PM, Robert Bradshaw wrote:

I think the problem is that it never leverages the (traditionally much cheaper) 
CombineFn.addInput(old_accumulator, new_value). Instead, it always calls 
CombineFn.mergeAccumulators(old_accumulator, 
CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be 
feasible to fix this while still handling windowing correctly. (The 
end-of-window timestamp combiner could also be optimized because the timestamp 
need not be tracked throughout in that case.)

On the other hand, once we move everything to portability, it's we'll probably 
toss all this code that use Spark's combiner lifting (instead using the 
GroupingCombiningTable that's implemented naively in Beam, as we do for Python 
to avoid fusion breaks).

On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote:
Hi,

I have hit a performance issue with Spark runner, that seems to related
to its current Combine.perKey implementation. I'll try to summarize what
I have found in the code:

     - Combine.perKey uses Spark's combineByKey primitive, which is pretty
similar to the definition of CombineFn

     - it holds all elements as WindowedValues, and uses
Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds
accumulated state for each window)

     - the update function is implemented as

      1) convert value to Iterable<WindowedValue<Acc>>

      2) merge accumulators for each windows

The logic inside createAccumulator and mergeAccumulators is quite
non-trivial. The result of profiling is that two frames where the code
spends most of the time are:

     41633930798   33.18%     4163
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
     19990682441   15.93%     1999
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable

A simple change on code from

     PCollection<..> input = ...

     input.apply(Combine.perKey(...))

to

     PCollection<..> input = ...

     input

       .apply(GroupByKey.create())

       .apply(Combine.groupedValues(...))

had drastical impact on the job run time (minutes as opposed to hours,
after which the first job didn't even finish!).

I think I understand the reason why the current logic is implemented as
it is, it has to deal with merging windows. But the consequences seem to
be that it renders the implementation very inefficient.

Has anyone seen similar behavior? Does my analysis of the problem seem
correct?

Jan


Reply via email to