I think the problem is that it never leverages the (traditionally much cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always calls CombineFn.mergeAccumulators(old_accumulator, CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be feasible to fix this while still handling windowing correctly. (The end-of-window timestamp combiner could also be optimized because the timestamp need not be tracked throughout in that case.)
On the other hand, once we move everything to portability, it's we'll probably toss all this code that use Spark's combiner lifting (instead using the GroupingCombiningTable that's implemented naively in Beam, as we do for Python to avoid fusion breaks). On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote: > Hi, > > I have hit a performance issue with Spark runner, that seems to related > to its current Combine.perKey implementation. I'll try to summarize what > I have found in the code: > > - Combine.perKey uses Spark's combineByKey primitive, which is pretty > similar to the definition of CombineFn > > - it holds all elements as WindowedValues, and uses > Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds > accumulated state for each window) > > - the update function is implemented as > > 1) convert value to Iterable<WindowedValue<Acc>> > > 2) merge accumulators for each windows > > The logic inside createAccumulator and mergeAccumulators is quite > non-trivial. The result of profiling is that two frames where the code > spends most of the time are: > > 41633930798 33.18% 4163 > > org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners > 19990682441 15.93% 1999 > > org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable > > A simple change on code from > > PCollection<..> input = ... > > input.apply(Combine.perKey(...)) > > to > > PCollection<..> input = ... > > input > > .apply(GroupByKey.create()) > > .apply(Combine.groupedValues(...)) > > had drastical impact on the job run time (minutes as opposed to hours, > after which the first job didn't even finish!). > > I think I understand the reason why the current logic is implemented as > it is, it has to deal with merging windows. But the consequences seem to > be that it renders the implementation very inefficient. > > Has anyone seen similar behavior? Does my analysis of the problem seem > correct? > > Jan > > >
