The comment fails to take into account the asymmetry between calling addInput vs. mergeAccumulators. It also focuses a lot on the asymptotic behavior, when the most common behavior is likely having a single (global) window.
Were I to implement this I would let the accumulator be a hashmap Window -> Value/Timestamp. For the non-merging, when a WindowedValue with N windows comes in, simply do O(N) lookup+addInput calls. Merging these accumulator hashmaps is pretty easy. For merging windows, I would first invoke the WindowFn to determine which old windows + new windows merged into bigger windows, and then construct the values of the bigger windows with the appropriate createAccumulator, addInput, and/or mergeAccumulators calls, depending on how many old vs. new values there are. This is a merging call + O(N) additions. BTW, the code is broken because it hard codes SessionWindows rather than calling WindowFn.mergeWindows(...). This is a correctness, not just a performance, bug :(. On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]> wrote: > Hi Robert, > > there is a comment around that which states, that the current solution > should be more efficient. I'd say, that (for non-merging windows) it would > be best to first explode windows, and only after that do combineByKey(key & > window). Merging windows would have to be handled the way it is, or maybe > it would be better to split this to > > 1) assign windows to elements > > 2) combineByKeyAndWindow > > Jan > On 6/13/19 3:51 PM, Robert Bradshaw wrote: > > I think the problem is that it never leverages the (traditionally much > cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always > calls CombineFn.mergeAccumulators(old_accumulator, > CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be > feasible to fix this while still handling windowing correctly. (The > end-of-window timestamp combiner could also be optimized because the > timestamp need not be tracked throughout in that case.) > > On the other hand, once we move everything to portability, it's we'll > probably toss all this code that use Spark's combiner lifting (instead > using the GroupingCombiningTable that's implemented naively in Beam, as we > do for Python to avoid fusion breaks). > > On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote: > >> Hi, >> >> I have hit a performance issue with Spark runner, that seems to related >> to its current Combine.perKey implementation. I'll try to summarize what >> I have found in the code: >> >> - Combine.perKey uses Spark's combineByKey primitive, which is pretty >> similar to the definition of CombineFn >> >> - it holds all elements as WindowedValues, and uses >> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds >> accumulated state for each window) >> >> - the update function is implemented as >> >> 1) convert value to Iterable<WindowedValue<Acc>> >> >> 2) merge accumulators for each windows >> >> The logic inside createAccumulator and mergeAccumulators is quite >> non-trivial. The result of profiling is that two frames where the code >> spends most of the time are: >> >> 41633930798 33.18% 4163 >> >> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners >> 19990682441 15.93% 1999 >> >> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable >> >> A simple change on code from >> >> PCollection<..> input = ... >> >> input.apply(Combine.perKey(...)) >> >> to >> >> PCollection<..> input = ... >> >> input >> >> .apply(GroupByKey.create()) >> >> .apply(Combine.groupedValues(...)) >> >> had drastical impact on the job run time (minutes as opposed to hours, >> after which the first job didn't even finish!). >> >> I think I understand the reason why the current logic is implemented as >> it is, it has to deal with merging windows. But the consequences seem to >> be that it renders the implementation very inefficient. >> >> Has anyone seen similar behavior? Does my analysis of the problem seem >> correct? >> >> Jan >> >> >>
