On Fri, Jun 14, 2019 at 1:02 PM Jan Lukavský <[email protected]> wrote:
>
>  > Interesting. However, there we should never need to sort the windows
> of the input, only the set of live windows (of which there may be any
> number regardless of whether WindowFn does singleton assignments, and
> only then in the merging case).
>
> Ack, however the current implementation creates accumulator from input
> and therefore sorts windows for all elements. Moreover, it doesn't
> distinguish between merging and non-merging windows.

Yes, the create accumulator from input (and then merging the created
accumulators) is where the insane overheads are coming from. We should
not use that pattern.

>  > This seems like a merging vs. non-merging choice, not a
> single-vs-multiple window choice.
>
> I'd say the condition should be non-merging && not many windows per
> element, because otherwise it makes sense to group the windows although
> it is non-merging windowing (sliding with small slide step). Otherwise
> we would explode the data too much.

It depends on the typical number of elements per window. E.g. consider
sliding windows with a size of one hour and a periodicity of one
minute. If we have one datum per hour, better to not explode. If we
have one datum per minute, it's a wash. If we have one datum per
second, much better to explode.

> On 6/14/19 12:19 PM, Robert Bradshaw wrote:
> > On Fri, Jun 14, 2019 at 12:10 PM Jan Lukavský <[email protected]> wrote:
> >> Hi Robert,
> >>
> >> thanks for the discussion. I will create a JIRA with summary of this.
> >> Some comments inline.
> >>
> >> Jan
> >>
> >> On 6/14/19 10:49 AM, Robert Bradshaw wrote:
> >>> On Thu, Jun 13, 2019 at 8:43 PM Jan Lukavský <[email protected]> wrote:
> >>>> On 6/13/19 6:10 PM, Robert Bradshaw wrote:
> >>>>
> >>>> On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected]> wrote:
> >>>>> On 6/13/19 4:31 PM, Robert Bradshaw wrote:
> >>>>>
> >>>>> The comment fails to take into account the asymmetry between calling 
> >>>>> addInput vs. mergeAccumulators. It also focuses a lot on the asymptotic 
> >>>>> behavior, when the most common behavior is likely having a single 
> >>>>> (global) window.
> >>>>>
> >>>>> Yes, occurred to me too. There are more questions here:
> >>>>>
> >>>>>    a) it would help if WindowedValue#explodeWindows would return List 
> >>>>> instead of Iterable, because then optimizations would be possible based 
> >>>>> on number of windows (e.g. when there is only single window, there is 
> >>>>> no need to sort anything). This should be simple change, as it already 
> >>>>> is a List.
> >>>> Well, I'm wary of this change, but we could always create a list out of 
> >>>> it (via cast or ImmutableList.copyOf) if we needed.
> >>>>
> >>>> Why so? I thought this would be the least objectionable change, because 
> >>>> it actually *is* a List, and there is no interface, it is just a public 
> >>>> method, that needs to be changed and state the fact correctly. A 
> >>>> Collection would be the same. Iterable is for cases, where you don't 
> >>>> exactly know if the data is stored in memory, or loaded from somewhere 
> >>>> else and whence the size cannot be determined in advance. This is not 
> >>>> the case for WindowFn.
> >>> It constrains us in that if WindowFn returns an iterable and we want
> >>> to store it as such we can no longer do so. I'm also not seeing what
> >>> optimization there is here--the thing we'd want to sort is the set of
> >>> existing windows (plus perhaps this one). Even if we wanted to do the
> >>> sort here, sort of a 1-item list should be insanely cheap.
> >> Agree, that this should be probably suffice to do on the WindowFn. Then
> >> there is no need to retrieve the number of windows for element, because
> >> what actually matters most is whether the count is == 1 or > 1. The
> >> WindowFn can give such information. Sorting on 1-item list on the other
> >> hand is not that cheap as it might look. It invokes TimSort and does
> >> sone calculations that appeared on my CPU profile quite often.
> > Interesting. However, there we should never need to sort the windows
> > of the input, only the set of live windows (of which there may be any
> > number regardless of whether WindowFn does singleton assignments, and
> > only then in the merging case).
> >
> >>>>>    b) I'm a little confused why it is better to keep key with all its 
> >>>>> windows in single WindowedValue in general. My first thoughts would 
> >>>>> really be - explode all windows to (key, window) pairs, and use these 
> >>>>> as keys as much as you can - there is obvious small drawback, this 
> >>>>> might be less efficient for windowing strategies with high number of 
> >>>>> windows per element (sliding windows with small slide compared to 
> >>>>> window length). But that could be added to the WindowFn and decision 
> >>>>> could be made accordingly.
> >>>> The optimization is for sliding windows, where it's more efficient to 
> >>>> send the value with all its windows and explode after the shuffle rather 
> >>>> than duplicate the value before. Of course this breaks that ability with 
> >>>> the hopes of being able to reduce the size more by doing combining. (A 
> >>>> half-way approach would be to group on unexploded window set, which 
> >>>> sounds worse generically but isn't bad in practice.)
> >>>>
> >>>> I'd say, that adding something like WindowFn.isAssigningToSingleWindow() 
> >>>> would solve all the nuances.
> >>> If it helped. As the interface returns a list, I don't see much we can
> >>> skip in this case.
> >> The information can be used to add the (single) window label into the
> >> grouping key and skip all the other stuff.
> > This seems like a merging vs. non-merging choice, not a
> > single-vs-multiple window choice.
> >
> >>>>> Were I to implement this I would let the accumulator be a hashmap 
> >>>>> Window -> Value/Timestamp. For the non-merging, when a WindowedValue 
> >>>>> with N windows comes in, simply do O(N) lookup+addInput calls. Merging 
> >>>>> these accumulator hashmaps is pretty easy. For merging windows, I would 
> >>>>> first invoke the WindowFn to determine which old windows + new windows 
> >>>>> merged into bigger windows, and then construct the values of the bigger 
> >>>>> windows with the appropriate createAccumulator, addInput, and/or 
> >>>>> mergeAccumulators calls, depending on how many old vs. new values there 
> >>>>> are. This is a merging call + O(N) additions.
> >>>>>
> >>>>> Yep, that sounds like it could work. For single window (global, 
> >>>>> tumbling) the approach above would still be probably more efficient.
> >>>> Yes, for the global window one could do away with the (single-valued) 
> >>>> hashmap. Sessions.mergeWindows() does the tumbling to figure out which 
> >>>> windows to merge, so that'd be just as efficient.
> >>>>
> >>>> +1
> >>> Granted we should keep in mind that all of these further optimizations
> >>> probably pale in comparison to just getting rid of using
> >>> mergeAccumulators where we should be using addInput. And this is, in
> >>> the long run, dead code.
> >>>
> >>>
> >>>>> BTW, the code is broken because it hard codes SessionWindows rather 
> >>>>> than calling WindowFn.mergeWindows(...). This is a correctness, not 
> >>>>> just a performance, bug :(.
> >>>>>
> >>>>> Can you point me out in the code?
> >>>> E.g. 
> >>>> https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106
> >>>>  always merges things that are intersecting, rather than querying 
> >>>> WindowFn.mergeWindows to determine which, if any, should be merged.
> >>>>
> >>>>> On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]> wrote:
> >>>>>> Hi Robert,
> >>>>>>
> >>>>>> there is a comment around that which states, that the current solution 
> >>>>>> should be more efficient. I'd say, that (for non-merging windows) it 
> >>>>>> would be best to first explode windows, and only after that do 
> >>>>>> combineByKey(key & window). Merging windows would have to be handled 
> >>>>>> the way it is, or maybe it would be better to split this to
> >>>>>>
> >>>>>>    1) assign windows to elements
> >>>>>>
> >>>>>>    2) combineByKeyAndWindow
> >>>>>>
> >>>>>> Jan
> >>>>>>
> >>>>>> On 6/13/19 3:51 PM, Robert Bradshaw wrote:
> >>>>>>
> >>>>>> I think the problem is that it never leverages the (traditionally much 
> >>>>>> cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it 
> >>>>>> always calls CombineFn.mergeAccumulators(old_accumulator, 
> >>>>>> CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It 
> >>>>>> should be feasible to fix this while still handling windowing 
> >>>>>> correctly. (The end-of-window timestamp combiner could also be 
> >>>>>> optimized because the timestamp need not be tracked throughout in that 
> >>>>>> case.)
> >>>>>>
> >>>>>> On the other hand, once we move everything to portability, it's we'll 
> >>>>>> probably toss all this code that use Spark's combiner lifting (instead 
> >>>>>> using the GroupingCombiningTable that's implemented naively in Beam, 
> >>>>>> as we do for Python to avoid fusion breaks).
> >>>>>>
> >>>>>> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I have hit a performance issue with Spark runner, that seems to 
> >>>>>>> related
> >>>>>>> to its current Combine.perKey implementation. I'll try to summarize 
> >>>>>>> what
> >>>>>>> I have found in the code:
> >>>>>>>
> >>>>>>>     - Combine.perKey uses Spark's combineByKey primitive, which is 
> >>>>>>> pretty
> >>>>>>> similar to the definition of CombineFn
> >>>>>>>
> >>>>>>>     - it holds all elements as WindowedValues, and uses
> >>>>>>> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue holds
> >>>>>>> accumulated state for each window)
> >>>>>>>
> >>>>>>>     - the update function is implemented as
> >>>>>>>
> >>>>>>>      1) convert value to Iterable<WindowedValue<Acc>>
> >>>>>>>
> >>>>>>>      2) merge accumulators for each windows
> >>>>>>>
> >>>>>>> The logic inside createAccumulator and mergeAccumulators is quite
> >>>>>>> non-trivial. The result of profiling is that two frames where the code
> >>>>>>> spends most of the time are:
> >>>>>>>
> >>>>>>>     41633930798   33.18%     4163
> >>>>>>> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
> >>>>>>>     19990682441   15.93%     1999
> >>>>>>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
> >>>>>>>
> >>>>>>> A simple change on code from
> >>>>>>>
> >>>>>>>     PCollection<..> input = ...
> >>>>>>>
> >>>>>>>     input.apply(Combine.perKey(...))
> >>>>>>>
> >>>>>>> to
> >>>>>>>
> >>>>>>>     PCollection<..> input = ...
> >>>>>>>
> >>>>>>>     input
> >>>>>>>
> >>>>>>>       .apply(GroupByKey.create())
> >>>>>>>
> >>>>>>>       .apply(Combine.groupedValues(...))
> >>>>>>>
> >>>>>>> had drastical impact on the job run time (minutes as opposed to hours,
> >>>>>>> after which the first job didn't even finish!).
> >>>>>>>
> >>>>>>> I think I understand the reason why the current logic is implemented 
> >>>>>>> as
> >>>>>>> it is, it has to deal with merging windows. But the consequences seem 
> >>>>>>> to
> >>>>>>> be that it renders the implementation very inefficient.
> >>>>>>>
> >>>>>>> Has anyone seen similar behavior? Does my analysis of the problem seem
> >>>>>>> correct?
> >>>>>>>
> >>>>>>> Jan
> >>>>>>>
> >>>>>>>

Reply via email to