On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected]> wrote:
> On 6/13/19 4:31 PM, Robert Bradshaw wrote: > > The comment fails to take into account the asymmetry between calling > addInput vs. mergeAccumulators. It also focuses a lot on the asymptotic > behavior, when the most common behavior is likely having a single (global) > window. > > Yes, occurred to me too. There are more questions here: > > a) it would help if WindowedValue#explodeWindows would return List > instead of Iterable, because then optimizations would be possible based on > number of windows (e.g. when there is only single window, there is no need > to sort anything). This should be simple change, as it already is a List. > Well, I'm wary of this change, but we could always create a list out of it (via cast or ImmutableList.copyOf) if we needed. > b) I'm a little confused why it is better to keep key with all its > windows in single WindowedValue in general. My first thoughts would really > be - explode all windows to (key, window) pairs, and use these as keys as > much as you can - there is obvious small drawback, this might be less > efficient for windowing strategies with high number of windows per element > (sliding windows with small slide compared to window length). But that > could be added to the WindowFn and decision could be made accordingly. > The optimization is for sliding windows, where it's more efficient to send the value with all its windows and explode after the shuffle rather than duplicate the value before. Of course this breaks that ability with the hopes of being able to reduce the size more by doing combining. (A half-way approach would be to group on unexploded window set, which sounds worse generically but isn't bad in practice.) > Were I to implement this I would let the accumulator be a hashmap Window > -> Value/Timestamp. For the non-merging, when a WindowedValue with N > windows comes in, simply do O(N) lookup+addInput calls. Merging these > accumulator hashmaps is pretty easy. For merging windows, I would first > invoke the WindowFn to determine which old windows + new windows merged > into bigger windows, and then construct the values of the bigger windows > with the appropriate createAccumulator, addInput, and/or mergeAccumulators > calls, depending on how many old vs. new values there are. This is a > merging call + O(N) additions. > > Yep, that sounds like it could work. For single window (global, tumbling) > the approach above would still be probably more efficient. > Yes, for the global window one could do away with the (single-valued) hashmap. Sessions.mergeWindows() does the tumbling to figure out which windows to merge, so that'd be just as efficient. > > BTW, the code is broken because it hard codes SessionWindows rather than > calling WindowFn.mergeWindows(...). This is a correctness, not just a > performance, bug :(. > > Can you point me out in the code? > E.g. https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106 always merges things that are intersecting, rather than querying WindowFn.mergeWindows to determine which, if any, should be merged. On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]> wrote: > >> Hi Robert, >> >> there is a comment around that which states, that the current solution >> should be more efficient. I'd say, that (for non-merging windows) it would >> be best to first explode windows, and only after that do combineByKey(key & >> window). Merging windows would have to be handled the way it is, or maybe >> it would be better to split this to >> >> 1) assign windows to elements >> >> 2) combineByKeyAndWindow >> >> Jan >> On 6/13/19 3:51 PM, Robert Bradshaw wrote: >> >> I think the problem is that it never leverages the (traditionally much >> cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always >> calls CombineFn.mergeAccumulators(old_accumulator, >> CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be >> feasible to fix this while still handling windowing correctly. (The >> end-of-window timestamp combiner could also be optimized because the >> timestamp need not be tracked throughout in that case.) >> >> On the other hand, once we move everything to portability, it's we'll >> probably toss all this code that use Spark's combiner lifting (instead >> using the GroupingCombiningTable that's implemented naively in Beam, as we >> do for Python to avoid fusion breaks). >> >> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote: >> >>> Hi, >>> >>> I have hit a performance issue with Spark runner, that seems to related >>> to its current Combine.perKey implementation. I'll try to summarize what >>> I have found in the code: >>> >>> - Combine.perKey uses Spark's combineByKey primitive, which is pretty >>> similar to the definition of CombineFn >>> >>> - it holds all elements as WindowedValues, and uses >>> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds >>> accumulated state for each window) >>> >>> - the update function is implemented as >>> >>> 1) convert value to Iterable<WindowedValue<Acc>> >>> >>> 2) merge accumulators for each windows >>> >>> The logic inside createAccumulator and mergeAccumulators is quite >>> non-trivial. The result of profiling is that two frames where the code >>> spends most of the time are: >>> >>> 41633930798 33.18% 4163 >>> >>> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners >>> 19990682441 15.93% 1999 >>> >>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable >>> >>> A simple change on code from >>> >>> PCollection<..> input = ... >>> >>> input.apply(Combine.perKey(...)) >>> >>> to >>> >>> PCollection<..> input = ... >>> >>> input >>> >>> .apply(GroupByKey.create()) >>> >>> .apply(Combine.groupedValues(...)) >>> >>> had drastical impact on the job run time (minutes as opposed to hours, >>> after which the first job didn't even finish!). >>> >>> I think I understand the reason why the current logic is implemented as >>> it is, it has to deal with merging windows. But the consequences seem to >>> be that it renders the implementation very inefficient. >>> >>> Has anyone seen similar behavior? Does my analysis of the problem seem >>> correct? >>> >>> Jan >>> >>> >>>
