Hi,
I have hit a performance issue with Spark runner, that seems to related
to its current Combine.perKey implementation. I'll try to summarize what
I have found in the code:
- Combine.perKey uses Spark's combineByKey primitive, which is pretty
similar to the definition of CombineFn
- it holds all elements as WindowedValues, and uses
Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds
accumulated state for each window)
- the update function is implemented as
1) convert value to Iterable<WindowedValue<Acc>>
2) merge accumulators for each windows
The logic inside createAccumulator and mergeAccumulators is quite
non-trivial. The result of profiling is that two frames where the code
spends most of the time are:
41633930798 33.18% 4163
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
19990682441 15.93% 1999
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
A simple change on code from
PCollection<..> input = ...
input.apply(Combine.perKey(...))
to
PCollection<..> input = ...
input
.apply(GroupByKey.create())
.apply(Combine.groupedValues(...))
had drastical impact on the job run time (minutes as opposed to hours,
after which the first job didn't even finish!).
I think I understand the reason why the current logic is implemented as
it is, it has to deal with merging windows. But the consequences seem to
be that it renders the implementation very inefficient.
Has anyone seen similar behavior? Does my analysis of the problem seem
correct?
Jan