Hi Robert,

there is a comment around that which states, that the current solution should be more efficient. I'd say, that (for non-merging windows) it would be best to first explode windows, and only after that do combineByKey(key & window). Merging windows would have to be handled the way it is, or maybe it would be better to split this to

 1) assign windows to elements

 2) combineByKeyAndWindow

Jan

On 6/13/19 3:51 PM, Robert Bradshaw wrote:
I think the problem is that it never leverages the (traditionally much cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always calls CombineFn.mergeAccumulators(old_accumulator, CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be feasible to fix this while still handling windowing correctly. (The end-of-window timestamp combiner could also be optimized because the timestamp need not be tracked throughout in that case.)

On the other hand, once we move everything to portability, it's we'll probably toss all this code that use Spark's combiner lifting (instead using the GroupingCombiningTable that's implemented naively in Beam, as we do for Python to avoid fusion breaks).

On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi,

    I have hit a performance issue with Spark runner, that seems to
    related
    to its current Combine.perKey implementation. I'll try to
    summarize what
    I have found in the code:

      - Combine.perKey uses Spark's combineByKey primitive, which is
    pretty
    similar to the definition of CombineFn

      - it holds all elements as WindowedValues, and uses
    Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds
    accumulated state for each window)

      - the update function is implemented as

       1) convert value to Iterable<WindowedValue<Acc>>

       2) merge accumulators for each windows

    The logic inside createAccumulator and mergeAccumulators is quite
    non-trivial. The result of profiling is that two frames where the
    code
    spends most of the time are:

      41633930798   33.18%     4163
    org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
      19990682441   15.93%     1999
    
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable

    A simple change on code from

      PCollection<..> input = ...

      input.apply(Combine.perKey(...))

    to

      PCollection<..> input = ...

      input

        .apply(GroupByKey.create())

        .apply(Combine.groupedValues(...))

    had drastical impact on the job run time (minutes as opposed to
    hours,
    after which the first job didn't even finish!).

    I think I understand the reason why the current logic is
    implemented as
    it is, it has to deal with merging windows. But the consequences
    seem to
    be that it renders the implementation very inefficient.

    Has anyone seen similar behavior? Does my analysis of the problem
    seem
    correct?

    Jan


Reply via email to