On Fri, Jun 14, 2019 at 1:02 PM Jan Lukavský <[email protected]> wrote: > > > Interesting. However, there we should never need to sort the windows > of the input, only the set of live windows (of which there may be any > number regardless of whether WindowFn does singleton assignments, and > only then in the merging case). > > Ack, however the current implementation creates accumulator from input > and therefore sorts windows for all elements. Moreover, it doesn't > distinguish between merging and non-merging windows.
Yes, the create accumulator from input (and then merging the created accumulators) is where the insane overheads are coming from. We should not use that pattern. > > This seems like a merging vs. non-merging choice, not a > single-vs-multiple window choice. > > I'd say the condition should be non-merging && not many windows per > element, because otherwise it makes sense to group the windows although > it is non-merging windowing (sliding with small slide step). Otherwise > we would explode the data too much. It depends on the typical number of elements per window. E.g. consider sliding windows with a size of one hour and a periodicity of one minute. If we have one datum per hour, better to not explode. If we have one datum per minute, it's a wash. If we have one datum per second, much better to explode. > On 6/14/19 12:19 PM, Robert Bradshaw wrote: > > On Fri, Jun 14, 2019 at 12:10 PM Jan Lukavský <[email protected]> wrote: > >> Hi Robert, > >> > >> thanks for the discussion. I will create a JIRA with summary of this. > >> Some comments inline. > >> > >> Jan > >> > >> On 6/14/19 10:49 AM, Robert Bradshaw wrote: > >>> On Thu, Jun 13, 2019 at 8:43 PM Jan Lukavský <[email protected]> wrote: > >>>> On 6/13/19 6:10 PM, Robert Bradshaw wrote: > >>>> > >>>> On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected]> wrote: > >>>>> On 6/13/19 4:31 PM, Robert Bradshaw wrote: > >>>>> > >>>>> The comment fails to take into account the asymmetry between calling > >>>>> addInput vs. mergeAccumulators. It also focuses a lot on the asymptotic > >>>>> behavior, when the most common behavior is likely having a single > >>>>> (global) window. > >>>>> > >>>>> Yes, occurred to me too. There are more questions here: > >>>>> > >>>>> a) it would help if WindowedValue#explodeWindows would return List > >>>>> instead of Iterable, because then optimizations would be possible based > >>>>> on number of windows (e.g. when there is only single window, there is > >>>>> no need to sort anything). This should be simple change, as it already > >>>>> is a List. > >>>> Well, I'm wary of this change, but we could always create a list out of > >>>> it (via cast or ImmutableList.copyOf) if we needed. > >>>> > >>>> Why so? I thought this would be the least objectionable change, because > >>>> it actually *is* a List, and there is no interface, it is just a public > >>>> method, that needs to be changed and state the fact correctly. A > >>>> Collection would be the same. Iterable is for cases, where you don't > >>>> exactly know if the data is stored in memory, or loaded from somewhere > >>>> else and whence the size cannot be determined in advance. This is not > >>>> the case for WindowFn. > >>> It constrains us in that if WindowFn returns an iterable and we want > >>> to store it as such we can no longer do so. I'm also not seeing what > >>> optimization there is here--the thing we'd want to sort is the set of > >>> existing windows (plus perhaps this one). Even if we wanted to do the > >>> sort here, sort of a 1-item list should be insanely cheap. > >> Agree, that this should be probably suffice to do on the WindowFn. Then > >> there is no need to retrieve the number of windows for element, because > >> what actually matters most is whether the count is == 1 or > 1. The > >> WindowFn can give such information. Sorting on 1-item list on the other > >> hand is not that cheap as it might look. It invokes TimSort and does > >> sone calculations that appeared on my CPU profile quite often. > > Interesting. However, there we should never need to sort the windows > > of the input, only the set of live windows (of which there may be any > > number regardless of whether WindowFn does singleton assignments, and > > only then in the merging case). > > > >>>>> b) I'm a little confused why it is better to keep key with all its > >>>>> windows in single WindowedValue in general. My first thoughts would > >>>>> really be - explode all windows to (key, window) pairs, and use these > >>>>> as keys as much as you can - there is obvious small drawback, this > >>>>> might be less efficient for windowing strategies with high number of > >>>>> windows per element (sliding windows with small slide compared to > >>>>> window length). But that could be added to the WindowFn and decision > >>>>> could be made accordingly. > >>>> The optimization is for sliding windows, where it's more efficient to > >>>> send the value with all its windows and explode after the shuffle rather > >>>> than duplicate the value before. Of course this breaks that ability with > >>>> the hopes of being able to reduce the size more by doing combining. (A > >>>> half-way approach would be to group on unexploded window set, which > >>>> sounds worse generically but isn't bad in practice.) > >>>> > >>>> I'd say, that adding something like WindowFn.isAssigningToSingleWindow() > >>>> would solve all the nuances. > >>> If it helped. As the interface returns a list, I don't see much we can > >>> skip in this case. > >> The information can be used to add the (single) window label into the > >> grouping key and skip all the other stuff. > > This seems like a merging vs. non-merging choice, not a > > single-vs-multiple window choice. > > > >>>>> Were I to implement this I would let the accumulator be a hashmap > >>>>> Window -> Value/Timestamp. For the non-merging, when a WindowedValue > >>>>> with N windows comes in, simply do O(N) lookup+addInput calls. Merging > >>>>> these accumulator hashmaps is pretty easy. For merging windows, I would > >>>>> first invoke the WindowFn to determine which old windows + new windows > >>>>> merged into bigger windows, and then construct the values of the bigger > >>>>> windows with the appropriate createAccumulator, addInput, and/or > >>>>> mergeAccumulators calls, depending on how many old vs. new values there > >>>>> are. This is a merging call + O(N) additions. > >>>>> > >>>>> Yep, that sounds like it could work. For single window (global, > >>>>> tumbling) the approach above would still be probably more efficient. > >>>> Yes, for the global window one could do away with the (single-valued) > >>>> hashmap. Sessions.mergeWindows() does the tumbling to figure out which > >>>> windows to merge, so that'd be just as efficient. > >>>> > >>>> +1 > >>> Granted we should keep in mind that all of these further optimizations > >>> probably pale in comparison to just getting rid of using > >>> mergeAccumulators where we should be using addInput. And this is, in > >>> the long run, dead code. > >>> > >>> > >>>>> BTW, the code is broken because it hard codes SessionWindows rather > >>>>> than calling WindowFn.mergeWindows(...). This is a correctness, not > >>>>> just a performance, bug :(. > >>>>> > >>>>> Can you point me out in the code? > >>>> E.g. > >>>> https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106 > >>>> always merges things that are intersecting, rather than querying > >>>> WindowFn.mergeWindows to determine which, if any, should be merged. > >>>> > >>>>> On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]> wrote: > >>>>>> Hi Robert, > >>>>>> > >>>>>> there is a comment around that which states, that the current solution > >>>>>> should be more efficient. I'd say, that (for non-merging windows) it > >>>>>> would be best to first explode windows, and only after that do > >>>>>> combineByKey(key & window). Merging windows would have to be handled > >>>>>> the way it is, or maybe it would be better to split this to > >>>>>> > >>>>>> 1) assign windows to elements > >>>>>> > >>>>>> 2) combineByKeyAndWindow > >>>>>> > >>>>>> Jan > >>>>>> > >>>>>> On 6/13/19 3:51 PM, Robert Bradshaw wrote: > >>>>>> > >>>>>> I think the problem is that it never leverages the (traditionally much > >>>>>> cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it > >>>>>> always calls CombineFn.mergeAccumulators(old_accumulator, > >>>>>> CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It > >>>>>> should be feasible to fix this while still handling windowing > >>>>>> correctly. (The end-of-window timestamp combiner could also be > >>>>>> optimized because the timestamp need not be tracked throughout in that > >>>>>> case.) > >>>>>> > >>>>>> On the other hand, once we move everything to portability, it's we'll > >>>>>> probably toss all this code that use Spark's combiner lifting (instead > >>>>>> using the GroupingCombiningTable that's implemented naively in Beam, > >>>>>> as we do for Python to avoid fusion breaks). > >>>>>> > >>>>>> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote: > >>>>>>> Hi, > >>>>>>> > >>>>>>> I have hit a performance issue with Spark runner, that seems to > >>>>>>> related > >>>>>>> to its current Combine.perKey implementation. I'll try to summarize > >>>>>>> what > >>>>>>> I have found in the code: > >>>>>>> > >>>>>>> - Combine.perKey uses Spark's combineByKey primitive, which is > >>>>>>> pretty > >>>>>>> similar to the definition of CombineFn > >>>>>>> > >>>>>>> - it holds all elements as WindowedValues, and uses > >>>>>>> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds > >>>>>>> accumulated state for each window) > >>>>>>> > >>>>>>> - the update function is implemented as > >>>>>>> > >>>>>>> 1) convert value to Iterable<WindowedValue<Acc>> > >>>>>>> > >>>>>>> 2) merge accumulators for each windows > >>>>>>> > >>>>>>> The logic inside createAccumulator and mergeAccumulators is quite > >>>>>>> non-trivial. The result of profiling is that two frames where the code > >>>>>>> spends most of the time are: > >>>>>>> > >>>>>>> 41633930798 33.18% 4163 > >>>>>>> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners > >>>>>>> 19990682441 15.93% 1999 > >>>>>>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable > >>>>>>> > >>>>>>> A simple change on code from > >>>>>>> > >>>>>>> PCollection<..> input = ... > >>>>>>> > >>>>>>> input.apply(Combine.perKey(...)) > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> PCollection<..> input = ... > >>>>>>> > >>>>>>> input > >>>>>>> > >>>>>>> .apply(GroupByKey.create()) > >>>>>>> > >>>>>>> .apply(Combine.groupedValues(...)) > >>>>>>> > >>>>>>> had drastical impact on the job run time (minutes as opposed to hours, > >>>>>>> after which the first job didn't even finish!). > >>>>>>> > >>>>>>> I think I understand the reason why the current logic is implemented > >>>>>>> as > >>>>>>> it is, it has to deal with merging windows. But the consequences seem > >>>>>>> to > >>>>>>> be that it renders the implementation very inefficient. > >>>>>>> > >>>>>>> Has anyone seen similar behavior? Does my analysis of the problem seem > >>>>>>> correct? > >>>>>>> > >>>>>>> Jan > >>>>>>> > >>>>>>>
