The comment fails to take into account the asymmetry between calling
addInput vs. mergeAccumulators. It also focuses a lot on the asymptotic
behavior, when the most common behavior is likely having a single (global)
window.

Were I to implement this I would let the accumulator be a hashmap Window ->
Value/Timestamp. For the non-merging, when a WindowedValue with N windows
comes in, simply do O(N) lookup+addInput calls. Merging these accumulator
hashmaps is pretty easy. For merging windows, I would first invoke the
WindowFn to determine which old windows + new windows merged into bigger
windows, and then construct the values of the bigger windows with the
appropriate createAccumulator, addInput, and/or mergeAccumulators calls,
depending on how many old vs. new values there are. This is a merging call
+ O(N) additions.

BTW, the code is broken because it hard codes SessionWindows rather than
calling WindowFn.mergeWindows(...). This is a correctness, not just a
performance, bug :(.



On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]> wrote:

> Hi Robert,
>
> there is a comment around that which states, that the current solution
> should be more efficient. I'd say, that (for non-merging windows) it would
> be best to first explode windows, and only after that do combineByKey(key &
> window). Merging windows would have to be handled the way it is, or maybe
> it would be better to split this to
>
>  1) assign windows to elements
>
>  2) combineByKeyAndWindow
>
> Jan
> On 6/13/19 3:51 PM, Robert Bradshaw wrote:
>
> I think the problem is that it never leverages the (traditionally much
> cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it always
> calls CombineFn.mergeAccumulators(old_accumulator,
> CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be
> feasible to fix this while still handling windowing correctly. (The
> end-of-window timestamp combiner could also be optimized because the
> timestamp need not be tracked throughout in that case.)
>
> On the other hand, once we move everything to portability, it's we'll
> probably toss all this code that use Spark's combiner lifting (instead
> using the GroupingCombiningTable that's implemented naively in Beam, as we
> do for Python to avoid fusion breaks).
>
> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote:
>
>> Hi,
>>
>> I have hit a performance issue with Spark runner, that seems to related
>> to its current Combine.perKey implementation. I'll try to summarize what
>> I have found in the code:
>>
>>   - Combine.perKey uses Spark's combineByKey primitive, which is pretty
>> similar to the definition of CombineFn
>>
>>   - it holds all elements as WindowedValues, and uses
>> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds
>> accumulated state for each window)
>>
>>   - the update function is implemented as
>>
>>    1) convert value to Iterable<WindowedValue<Acc>>
>>
>>    2) merge accumulators for each windows
>>
>> The logic inside createAccumulator and mergeAccumulators is quite
>> non-trivial. The result of profiling is that two frames where the code
>> spends most of the time are:
>>
>>   41633930798   33.18%     4163
>>
>> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
>>   19990682441   15.93%     1999
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
>>
>> A simple change on code from
>>
>>   PCollection<..> input = ...
>>
>>   input.apply(Combine.perKey(...))
>>
>> to
>>
>>   PCollection<..> input = ...
>>
>>   input
>>
>>     .apply(GroupByKey.create())
>>
>>     .apply(Combine.groupedValues(...))
>>
>> had drastical impact on the job run time (minutes as opposed to hours,
>> after which the first job didn't even finish!).
>>
>> I think I understand the reason why the current logic is implemented as
>> it is, it has to deal with merging windows. But the consequences seem to
>> be that it renders the implementation very inefficient.
>>
>> Has anyone seen similar behavior? Does my analysis of the problem seem
>> correct?
>>
>> Jan
>>
>>
>>

Reply via email to