Hi Robert,
there is a comment around that which states, that the current solution
should be more efficient. I'd say, that (for non-merging windows) it
would be best to first explode windows, and only after that do
combineByKey(key & window). Merging windows would have to be handled the
way it is, or maybe it would be better to split this to
1) assign windows to elements
2) combineByKeyAndWindow
Jan
On 6/13/19 3:51 PM, Robert Bradshaw wrote:
I think the problem is that it never leverages the (traditionally much
cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it
always calls CombineFn.mergeAccumulators(old_accumulator,
CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It
should be feasible to fix this while still handling windowing
correctly. (The end-of-window timestamp combiner could also be
optimized because the timestamp need not be tracked throughout in that
case.)
On the other hand, once we move everything to portability, it's we'll
probably toss all this code that use Spark's combiner lifting (instead
using the GroupingCombiningTable that's implemented naively in Beam,
as we do for Python to avoid fusion breaks).
On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
I have hit a performance issue with Spark runner, that seems to
related
to its current Combine.perKey implementation. I'll try to
summarize what
I have found in the code:
- Combine.perKey uses Spark's combineByKey primitive, which is
pretty
similar to the definition of CombineFn
- it holds all elements as WindowedValues, and uses
Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds
accumulated state for each window)
- the update function is implemented as
1) convert value to Iterable<WindowedValue<Acc>>
2) merge accumulators for each windows
The logic inside createAccumulator and mergeAccumulators is quite
non-trivial. The result of profiling is that two frames where the
code
spends most of the time are:
41633930798 33.18% 4163
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
19990682441 15.93% 1999
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
A simple change on code from
PCollection<..> input = ...
input.apply(Combine.perKey(...))
to
PCollection<..> input = ...
input
.apply(GroupByKey.create())
.apply(Combine.groupedValues(...))
had drastical impact on the job run time (minutes as opposed to
hours,
after which the first job didn't even finish!).
I think I understand the reason why the current logic is
implemented as
it is, it has to deal with merging windows. But the consequences
seem to
be that it renders the implementation very inefficient.
Has anyone seen similar behavior? Does my analysis of the problem
seem
correct?
Jan