I think the problem is that it never leverages the (traditionally much
cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always
calls CombineFn.mergeAccumulators(old_accumulator,
CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be
feasible to fix this while still handling windowing correctly. (The
end-of-window timestamp combiner could also be optimized because the
timestamp need not be tracked throughout in that case.)

On the other hand, once we move everything to portability, it's we'll
probably toss all this code that use Spark's combiner lifting (instead
using the GroupingCombiningTable that's implemented naively in Beam, as we
do for Python to avoid fusion breaks).

On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote:

> Hi,
>
> I have hit a performance issue with Spark runner, that seems to related
> to its current Combine.perKey implementation. I'll try to summarize what
> I have found in the code:
>
>   - Combine.perKey uses Spark's combineByKey primitive, which is pretty
> similar to the definition of CombineFn
>
>   - it holds all elements as WindowedValues, and uses
> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds
> accumulated state for each window)
>
>   - the update function is implemented as
>
>    1) convert value to Iterable<WindowedValue<Acc>>
>
>    2) merge accumulators for each windows
>
> The logic inside createAccumulator and mergeAccumulators is quite
> non-trivial. The result of profiling is that two frames where the code
> spends most of the time are:
>
>   41633930798   33.18%     4163
>
> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
>   19990682441   15.93%     1999
>
> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
>
> A simple change on code from
>
>   PCollection<..> input = ...
>
>   input.apply(Combine.perKey(...))
>
> to
>
>   PCollection<..> input = ...
>
>   input
>
>     .apply(GroupByKey.create())
>
>     .apply(Combine.groupedValues(...))
>
> had drastical impact on the job run time (minutes as opposed to hours,
> after which the first job didn't even finish!).
>
> I think I understand the reason why the current logic is implemented as
> it is, it has to deal with merging windows. But the consequences seem to
> be that it renders the implementation very inefficient.
>
> Has anyone seen similar behavior? Does my analysis of the problem seem
> correct?
>
> Jan
>
>
>

Reply via email to