Hi,

I have hit a performance issue with Spark runner, that seems to related to its current Combine.perKey implementation. I'll try to summarize what I have found in the code:

 - Combine.perKey uses Spark's combineByKey primitive, which is pretty similar to the definition of CombineFn

 - it holds all elements as WindowedValues, and uses Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds accumulated state for each window)

 - the update function is implemented as

  1) convert value to Iterable<WindowedValue<Acc>>

  2) merge accumulators for each windows

The logic inside createAccumulator and mergeAccumulators is quite non-trivial. The result of profiling is that two frames where the code spends most of the time are:

 41633930798   33.18%     4163 org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners  19990682441   15.93%     1999 org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable

A simple change on code from

 PCollection<..> input = ...

 input.apply(Combine.perKey(...))

to

 PCollection<..> input = ...

 input

   .apply(GroupByKey.create())

   .apply(Combine.groupedValues(...))

had drastical impact on the job run time (minutes as opposed to hours, after which the first job didn't even finish!).

I think I understand the reason why the current logic is implemented as it is, it has to deal with merging windows. But the consequences seem to be that it renders the implementation very inefficient.

Has anyone seen similar behavior? Does my analysis of the problem seem correct?

Jan


Reply via email to