On 6/13/19 6:10 PM, Robert Bradshaw wrote:
On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    On 6/13/19 4:31 PM, Robert Bradshaw wrote:
    The comment fails to take into account the asymmetry between
    calling addInput vs. mergeAccumulators. It also focuses a lot on
    the asymptotic behavior, when the most common behavior is likely
    having a single (global) window.

    Yes, occurred to me too. There are more questions here:

     a) it would help if WindowedValue#explodeWindows would return
    List instead of Iterable, because then optimizations would be
    possible based on number of windows (e.g. when there is only
    single window, there is no need to sort anything). This should be
    simple change, as it already is a List.

Well, I'm wary of this change, but we could always create a list out of it (via cast or ImmutableList.copyOf) if we needed.
Why so? I thought this would be the least objectionable change, because it actually *is* a List, and there is no interface, it is just a public method, that needs to be changed and state the fact correctly. A Collection would be the same. Iterable is for cases, where you don't exactly know if the data is stored in memory, or loaded from somewhere else and whence the size cannot be determined in advance. This is not the case for WindowFn.

     b) I'm a little confused why it is better to keep key with all
    its windows in single WindowedValue in general. My first thoughts
    would really be - explode all windows to (key, window) pairs, and
    use these as keys as much as you can - there is obvious small
    drawback, this might be less efficient for windowing strategies
    with high number of windows per element (sliding windows with
    small slide compared to window length). But that could be added to
    the WindowFn and decision could be made accordingly.

The optimization is for sliding windows, where it's more efficient to send the value with all its windows and explode after the shuffle rather than duplicate the value before. Of course this breaks that ability with the hopes of being able to reduce the size more by doing combining. (A half-way approach would be to group on unexploded window set, which sounds worse generically but isn't bad in practice.)
I'd say, that adding something like WindowFn.isAssigningToSingleWindow() would solve all the nuances.

    Were I to implement this I would let the accumulator be a hashmap
    Window -> Value/Timestamp. For the non-merging, when a
    WindowedValue with N windows comes in, simply do O(N)
    lookup+addInput calls. Merging these accumulator hashmaps is
    pretty easy. For merging windows, I would first invoke the
    WindowFn to determine which old windows + new windows merged into
    bigger windows, and then construct the values of the bigger
    windows with the appropriate createAccumulator, addInput, and/or
    mergeAccumulators calls, depending on how many old vs. new values
    there are. This is a merging call + O(N) additions.
    Yep, that sounds like it could work. For single window (global,
    tumbling) the approach above would still be probably more efficient.


Yes, for the global window one could do away with the (single-valued) hashmap. Sessions.mergeWindows() does the tumbling to figure out which windows to merge, so that'd be just as efficient.
+1


    BTW, the code is broken because it hard codes SessionWindows
    rather than calling WindowFn.mergeWindows(...). This is a
    correctness, not just a performance, bug :(.
    Can you point me out in the code?


E.g. https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106 always merges things that are intersecting, rather than querying WindowFn.mergeWindows to determine which, if any, should be merged.

    On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Robert,

        there is a comment around that which states, that the current
        solution should be more efficient. I'd say, that (for
        non-merging windows) it would be best to first explode
        windows, and only after that do combineByKey(key & window).
        Merging windows would have to be handled the way it is, or
        maybe it would be better to split this to

         1) assign windows to elements

         2) combineByKeyAndWindow

        Jan

        On 6/13/19 3:51 PM, Robert Bradshaw wrote:
        I think the problem is that it never leverages the
        (traditionally much cheaper)
        CombineFn.addInput(old_accumulator, new_value). Instead, it
        always calls CombineFn.mergeAccumulators(old_accumulator,
        CombineFn.addInput(CombineFn.createAccumulator(),
        new_value)). It should be feasible to fix this while still
        handling windowing correctly. (The end-of-window timestamp
        combiner could also be optimized because the timestamp need
        not be tracked throughout in that case.)

        On the other hand, once we move everything to portability,
        it's we'll probably toss all this code that use Spark's
        combiner lifting (instead using the GroupingCombiningTable
        that's implemented naively in Beam, as we do for Python to
        avoid fusion breaks).

        On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            Hi,

            I have hit a performance issue with Spark runner, that
            seems to related
            to its current Combine.perKey implementation. I'll try
            to summarize what
            I have found in the code:

              - Combine.perKey uses Spark's combineByKey primitive,
            which is pretty
            similar to the definition of CombineFn

              - it holds all elements as WindowedValues, and uses
            Iterable<WindowedValue<Acc>> as accumulator (each
            WindowedValue holds
            accumulated state for each window)

              - the update function is implemented as

               1) convert value to Iterable<WindowedValue<Acc>>

               2) merge accumulators for each windows

            The logic inside createAccumulator and mergeAccumulators
            is quite
            non-trivial. The result of profiling is that two frames
            where the code
            spends most of the time are:

              41633930798   33.18%     4163
            
org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
              19990682441   15.93%     1999
            
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable

            A simple change on code from

              PCollection<..> input = ...

              input.apply(Combine.perKey(...))

            to

              PCollection<..> input = ...

              input

                .apply(GroupByKey.create())

                .apply(Combine.groupedValues(...))

            had drastical impact on the job run time (minutes as
            opposed to hours,
            after which the first job didn't even finish!).

            I think I understand the reason why the current logic is
            implemented as
            it is, it has to deal with merging windows. But the
            consequences seem to
            be that it renders the implementation very inefficient.

            Has anyone seen similar behavior? Does my analysis of
            the problem seem
            correct?

            Jan


Reply via email to