On Mon, Jun 17, 2019 at 10:46 AM Jan Lukavský <[email protected]> wrote:
> > It depends on the typical number of elements per window. E.g. > consider sliding windows with a size of one hour and a periodicity of > one minute. If we have one datum per hour, better to not explode. If we > have one datum per minute, it's a wash. If we have one datum per second, > much better to explode. > > I'm not sure if I follow. I'd say it works the opposite. If there is one > record per second, then exploding windows before shuffle would mean we > have to shuffle 60x more data. That becomes critical as more and more > data is added, so exploding a dataset with 1000s of records per second > will become really bad. > I was referring specifically to the case where there's a CombineFn, so we'd be able to do more of the combining before the shuffle. > Either way, these are runtime conditions that are not known in advance, > so it cannot be used in pipeline translation. > True. (I suppose one could track and make such decisions dynamically, but that might not be worth the cost/complexity.) > I have created JIRA [1] to track that and will try some experiments. > Thanks. > > Jan > > [1] https://issues.apache.org/jira/browse/BEAM-7574 > > On 6/14/19 1:42 PM, Robert Bradshaw wrote: > > On Fri, Jun 14, 2019 at 1:02 PM Jan Lukavský <[email protected]> wrote: > >> > Interesting. However, there we should never need to sort the windows > >> of the input, only the set of live windows (of which there may be any > >> number regardless of whether WindowFn does singleton assignments, and > >> only then in the merging case). > >> > >> Ack, however the current implementation creates accumulator from input > >> and therefore sorts windows for all elements. Moreover, it doesn't > >> distinguish between merging and non-merging windows. > > Yes, the create accumulator from input (and then merging the created > > accumulators) is where the insane overheads are coming from. We should > > not use that pattern. > > > >> > This seems like a merging vs. non-merging choice, not a > >> single-vs-multiple window choice. > >> > >> I'd say the condition should be non-merging && not many windows per > >> element, because otherwise it makes sense to group the windows although > >> it is non-merging windowing (sliding with small slide step). Otherwise > >> we would explode the data too much. > > It depends on the typical number of elements per window. E.g. consider > > sliding windows with a size of one hour and a periodicity of one > > minute. If we have one datum per hour, better to not explode. If we > > have one datum per minute, it's a wash. If we have one datum per > > second, much better to explode. > > > >> On 6/14/19 12:19 PM, Robert Bradshaw wrote: > >>> On Fri, Jun 14, 2019 at 12:10 PM Jan Lukavský <[email protected]> wrote: > >>>> Hi Robert, > >>>> > >>>> thanks for the discussion. I will create a JIRA with summary of this. > >>>> Some comments inline. > >>>> > >>>> Jan > >>>> > >>>> On 6/14/19 10:49 AM, Robert Bradshaw wrote: > >>>>> On Thu, Jun 13, 2019 at 8:43 PM Jan Lukavský <[email protected]> > wrote: > >>>>>> On 6/13/19 6:10 PM, Robert Bradshaw wrote: > >>>>>> > >>>>>> On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected]> > wrote: > >>>>>>> On 6/13/19 4:31 PM, Robert Bradshaw wrote: > >>>>>>> > >>>>>>> The comment fails to take into account the asymmetry between > calling addInput vs. mergeAccumulators. It also focuses a lot on the > asymptotic behavior, when the most common behavior is likely having a > single (global) window. > >>>>>>> > >>>>>>> Yes, occurred to me too. There are more questions here: > >>>>>>> > >>>>>>> a) it would help if WindowedValue#explodeWindows would return > List instead of Iterable, because then optimizations would be possible > based on number of windows (e.g. when there is only single window, there is > no need to sort anything). This should be simple change, as it already is a > List. > >>>>>> Well, I'm wary of this change, but we could always create a list > out of it (via cast or ImmutableList.copyOf) if we needed. > >>>>>> > >>>>>> Why so? I thought this would be the least objectionable change, > because it actually *is* a List, and there is no interface, it is just a > public method, that needs to be changed and state the fact correctly. A > Collection would be the same. Iterable is for cases, where you don't > exactly know if the data is stored in memory, or loaded from somewhere else > and whence the size cannot be determined in advance. This is not the case > for WindowFn. > >>>>> It constrains us in that if WindowFn returns an iterable and we want > >>>>> to store it as such we can no longer do so. I'm also not seeing what > >>>>> optimization there is here--the thing we'd want to sort is the set of > >>>>> existing windows (plus perhaps this one). Even if we wanted to do the > >>>>> sort here, sort of a 1-item list should be insanely cheap. > >>>> Agree, that this should be probably suffice to do on the WindowFn. > Then > >>>> there is no need to retrieve the number of windows for element, > because > >>>> what actually matters most is whether the count is == 1 or > 1. The > >>>> WindowFn can give such information. Sorting on 1-item list on the > other > >>>> hand is not that cheap as it might look. It invokes TimSort and does > >>>> sone calculations that appeared on my CPU profile quite often. > >>> Interesting. However, there we should never need to sort the windows > >>> of the input, only the set of live windows (of which there may be any > >>> number regardless of whether WindowFn does singleton assignments, and > >>> only then in the merging case). > >>> > >>>>>>> b) I'm a little confused why it is better to keep key with all > its windows in single WindowedValue in general. My first thoughts would > really be - explode all windows to (key, window) pairs, and use these as > keys as much as you can - there is obvious small drawback, this might be > less efficient for windowing strategies with high number of windows per > element (sliding windows with small slide compared to window length). But > that could be added to the WindowFn and decision could be made accordingly. > >>>>>> The optimization is for sliding windows, where it's more efficient > to send the value with all its windows and explode after the shuffle rather > than duplicate the value before. Of course this breaks that ability with > the hopes of being able to reduce the size more by doing combining. (A > half-way approach would be to group on unexploded window set, which sounds > worse generically but isn't bad in practice.) > >>>>>> > >>>>>> I'd say, that adding something like > WindowFn.isAssigningToSingleWindow() would solve all the nuances. > >>>>> If it helped. As the interface returns a list, I don't see much we > can > >>>>> skip in this case. > >>>> The information can be used to add the (single) window label into the > >>>> grouping key and skip all the other stuff. > >>> This seems like a merging vs. non-merging choice, not a > >>> single-vs-multiple window choice. > >>> > >>>>>>> Were I to implement this I would let the accumulator be a hashmap > Window -> Value/Timestamp. For the non-merging, when a WindowedValue with N > windows comes in, simply do O(N) lookup+addInput calls. Merging these > accumulator hashmaps is pretty easy. For merging windows, I would first > invoke the WindowFn to determine which old windows + new windows merged > into bigger windows, and then construct the values of the bigger windows > with the appropriate createAccumulator, addInput, and/or mergeAccumulators > calls, depending on how many old vs. new values there are. This is a > merging call + O(N) additions. > >>>>>>> > >>>>>>> Yep, that sounds like it could work. For single window (global, > tumbling) the approach above would still be probably more efficient. > >>>>>> Yes, for the global window one could do away with the > (single-valued) hashmap. Sessions.mergeWindows() does the tumbling to > figure out which windows to merge, so that'd be just as efficient. > >>>>>> > >>>>>> +1 > >>>>> Granted we should keep in mind that all of these further > optimizations > >>>>> probably pale in comparison to just getting rid of using > >>>>> mergeAccumulators where we should be using addInput. And this is, in > >>>>> the long run, dead code. > >>>>> > >>>>> > >>>>>>> BTW, the code is broken because it hard codes SessionWindows > rather than calling WindowFn.mergeWindows(...). This is a correctness, not > just a performance, bug :(. > >>>>>>> > >>>>>>> Can you point me out in the code? > >>>>>> E.g. > https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106 > always merges things that are intersecting, rather than querying > WindowFn.mergeWindows to determine which, if any, should be merged. > >>>>>> > >>>>>>> On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]> > wrote: > >>>>>>>> Hi Robert, > >>>>>>>> > >>>>>>>> there is a comment around that which states, that the current > solution should be more efficient. I'd say, that (for non-merging windows) > it would be best to first explode windows, and only after that do > combineByKey(key & window). Merging windows would have to be handled the > way it is, or maybe it would be better to split this to > >>>>>>>> > >>>>>>>> 1) assign windows to elements > >>>>>>>> > >>>>>>>> 2) combineByKeyAndWindow > >>>>>>>> > >>>>>>>> Jan > >>>>>>>> > >>>>>>>> On 6/13/19 3:51 PM, Robert Bradshaw wrote: > >>>>>>>> > >>>>>>>> I think the problem is that it never leverages the (traditionally > much cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it > always calls CombineFn.mergeAccumulators(old_accumulator, > CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be > feasible to fix this while still handling windowing correctly. (The > end-of-window timestamp combiner could also be optimized because the > timestamp need not be tracked throughout in that case.) > >>>>>>>> > >>>>>>>> On the other hand, once we move everything to portability, it's > we'll probably toss all this code that use Spark's combiner lifting > (instead using the GroupingCombiningTable that's implemented naively in > Beam, as we do for Python to avoid fusion breaks). > >>>>>>>> > >>>>>>>> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]> > wrote: > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> I have hit a performance issue with Spark runner, that seems to > related > >>>>>>>>> to its current Combine.perKey implementation. I'll try to > summarize what > >>>>>>>>> I have found in the code: > >>>>>>>>> > >>>>>>>>> - Combine.perKey uses Spark's combineByKey primitive, which > is pretty > >>>>>>>>> similar to the definition of CombineFn > >>>>>>>>> > >>>>>>>>> - it holds all elements as WindowedValues, and uses > >>>>>>>>> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue > holds > >>>>>>>>> accumulated state for each window) > >>>>>>>>> > >>>>>>>>> - the update function is implemented as > >>>>>>>>> > >>>>>>>>> 1) convert value to Iterable<WindowedValue<Acc>> > >>>>>>>>> > >>>>>>>>> 2) merge accumulators for each windows > >>>>>>>>> > >>>>>>>>> The logic inside createAccumulator and mergeAccumulators is quite > >>>>>>>>> non-trivial. The result of profiling is that two frames where > the code > >>>>>>>>> spends most of the time are: > >>>>>>>>> > >>>>>>>>> 41633930798 33.18% 4163 > >>>>>>>>> > org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners > >>>>>>>>> 19990682441 15.93% 1999 > >>>>>>>>> > org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable > >>>>>>>>> > >>>>>>>>> A simple change on code from > >>>>>>>>> > >>>>>>>>> PCollection<..> input = ... > >>>>>>>>> > >>>>>>>>> input.apply(Combine.perKey(...)) > >>>>>>>>> > >>>>>>>>> to > >>>>>>>>> > >>>>>>>>> PCollection<..> input = ... > >>>>>>>>> > >>>>>>>>> input > >>>>>>>>> > >>>>>>>>> .apply(GroupByKey.create()) > >>>>>>>>> > >>>>>>>>> .apply(Combine.groupedValues(...)) > >>>>>>>>> > >>>>>>>>> had drastical impact on the job run time (minutes as opposed to > hours, > >>>>>>>>> after which the first job didn't even finish!). > >>>>>>>>> > >>>>>>>>> I think I understand the reason why the current logic is > implemented as > >>>>>>>>> it is, it has to deal with merging windows. But the consequences > seem to > >>>>>>>>> be that it renders the implementation very inefficient. > >>>>>>>>> > >>>>>>>>> Has anyone seen similar behavior? Does my analysis of the > problem seem > >>>>>>>>> correct? > >>>>>>>>> > >>>>>>>>> Jan > >>>>>>>>> > >>>>>>>>> >
