On Mon, Jun 17, 2019 at 10:46 AM Jan Lukavský <[email protected]> wrote:

>  > It depends on the typical number of elements per window. E.g.
> consider sliding windows with a size of one hour and a periodicity of
> one minute. If we have one datum per hour, better to not explode. If we
> have one datum per minute, it's a wash. If we have one datum per second,
> much better to explode.
>
> I'm not sure if I follow. I'd say it works the opposite. If there is one
> record per second, then exploding windows before shuffle would mean we
> have to shuffle 60x more data. That becomes critical as more and more
> data is added, so exploding a dataset with 1000s of records per second
> will become really bad.
>

I was referring specifically to the case where there's a CombineFn, so we'd
be able to do more of the combining before the shuffle.


> Either way, these are runtime conditions that are not known in advance,
> so it cannot be used in pipeline translation.
>

True. (I suppose one could track and make such decisions dynamically, but
that might not be worth the cost/complexity.)


> I have created JIRA [1] to track that and will try some experiments.
>

Thanks.


>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7574
>
> On 6/14/19 1:42 PM, Robert Bradshaw wrote:
> > On Fri, Jun 14, 2019 at 1:02 PM Jan Lukavský <[email protected]> wrote:
> >>   > Interesting. However, there we should never need to sort the windows
> >> of the input, only the set of live windows (of which there may be any
> >> number regardless of whether WindowFn does singleton assignments, and
> >> only then in the merging case).
> >>
> >> Ack, however the current implementation creates accumulator from input
> >> and therefore sorts windows for all elements. Moreover, it doesn't
> >> distinguish between merging and non-merging windows.
> > Yes, the create accumulator from input (and then merging the created
> > accumulators) is where the insane overheads are coming from. We should
> > not use that pattern.
> >
> >>   > This seems like a merging vs. non-merging choice, not a
> >> single-vs-multiple window choice.
> >>
> >> I'd say the condition should be non-merging && not many windows per
> >> element, because otherwise it makes sense to group the windows although
> >> it is non-merging windowing (sliding with small slide step). Otherwise
> >> we would explode the data too much.
> > It depends on the typical number of elements per window. E.g. consider
> > sliding windows with a size of one hour and a periodicity of one
> > minute. If we have one datum per hour, better to not explode. If we
> > have one datum per minute, it's a wash. If we have one datum per
> > second, much better to explode.
> >
> >> On 6/14/19 12:19 PM, Robert Bradshaw wrote:
> >>> On Fri, Jun 14, 2019 at 12:10 PM Jan Lukavský <[email protected]> wrote:
> >>>> Hi Robert,
> >>>>
> >>>> thanks for the discussion. I will create a JIRA with summary of this.
> >>>> Some comments inline.
> >>>>
> >>>> Jan
> >>>>
> >>>> On 6/14/19 10:49 AM, Robert Bradshaw wrote:
> >>>>> On Thu, Jun 13, 2019 at 8:43 PM Jan Lukavský <[email protected]>
> wrote:
> >>>>>> On 6/13/19 6:10 PM, Robert Bradshaw wrote:
> >>>>>>
> >>>>>> On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský <[email protected]>
> wrote:
> >>>>>>> On 6/13/19 4:31 PM, Robert Bradshaw wrote:
> >>>>>>>
> >>>>>>> The comment fails to take into account the asymmetry between
> calling addInput vs. mergeAccumulators. It also focuses a lot on the
> asymptotic behavior, when the most common behavior is likely having a
> single (global) window.
> >>>>>>>
> >>>>>>> Yes, occurred to me too. There are more questions here:
> >>>>>>>
> >>>>>>>     a) it would help if WindowedValue#explodeWindows would return
> List instead of Iterable, because then optimizations would be possible
> based on number of windows (e.g. when there is only single window, there is
> no need to sort anything). This should be simple change, as it already is a
> List.
> >>>>>> Well, I'm wary of this change, but we could always create a list
> out of it (via cast or ImmutableList.copyOf) if we needed.
> >>>>>>
> >>>>>> Why so? I thought this would be the least objectionable change,
> because it actually *is* a List, and there is no interface, it is just a
> public method, that needs to be changed and state the fact correctly. A
> Collection would be the same. Iterable is for cases, where you don't
> exactly know if the data is stored in memory, or loaded from somewhere else
> and whence the size cannot be determined in advance. This is not the case
> for WindowFn.
> >>>>> It constrains us in that if WindowFn returns an iterable and we want
> >>>>> to store it as such we can no longer do so. I'm also not seeing what
> >>>>> optimization there is here--the thing we'd want to sort is the set of
> >>>>> existing windows (plus perhaps this one). Even if we wanted to do the
> >>>>> sort here, sort of a 1-item list should be insanely cheap.
> >>>> Agree, that this should be probably suffice to do on the WindowFn.
> Then
> >>>> there is no need to retrieve the number of windows for element,
> because
> >>>> what actually matters most is whether the count is == 1 or > 1. The
> >>>> WindowFn can give such information. Sorting on 1-item list on the
> other
> >>>> hand is not that cheap as it might look. It invokes TimSort and does
> >>>> sone calculations that appeared on my CPU profile quite often.
> >>> Interesting. However, there we should never need to sort the windows
> >>> of the input, only the set of live windows (of which there may be any
> >>> number regardless of whether WindowFn does singleton assignments, and
> >>> only then in the merging case).
> >>>
> >>>>>>>     b) I'm a little confused why it is better to keep key with all
> its windows in single WindowedValue in general. My first thoughts would
> really be - explode all windows to (key, window) pairs, and use these as
> keys as much as you can - there is obvious small drawback, this might be
> less efficient for windowing strategies with high number of windows per
> element (sliding windows with small slide compared to window length). But
> that could be added to the WindowFn and decision could be made accordingly.
> >>>>>> The optimization is for sliding windows, where it's more efficient
> to send the value with all its windows and explode after the shuffle rather
> than duplicate the value before. Of course this breaks that ability with
> the hopes of being able to reduce the size more by doing combining. (A
> half-way approach would be to group on unexploded window set, which sounds
> worse generically but isn't bad in practice.)
> >>>>>>
> >>>>>> I'd say, that adding something like
> WindowFn.isAssigningToSingleWindow() would solve all the nuances.
> >>>>> If it helped. As the interface returns a list, I don't see much we
> can
> >>>>> skip in this case.
> >>>> The information can be used to add the (single) window label into the
> >>>> grouping key and skip all the other stuff.
> >>> This seems like a merging vs. non-merging choice, not a
> >>> single-vs-multiple window choice.
> >>>
> >>>>>>> Were I to implement this I would let the accumulator be a hashmap
> Window -> Value/Timestamp. For the non-merging, when a WindowedValue with N
> windows comes in, simply do O(N) lookup+addInput calls. Merging these
> accumulator hashmaps is pretty easy. For merging windows, I would first
> invoke the WindowFn to determine which old windows + new windows merged
> into bigger windows, and then construct the values of the bigger windows
> with the appropriate createAccumulator, addInput, and/or mergeAccumulators
> calls, depending on how many old vs. new values there are. This is a
> merging call + O(N) additions.
> >>>>>>>
> >>>>>>> Yep, that sounds like it could work. For single window (global,
> tumbling) the approach above would still be probably more efficient.
> >>>>>> Yes, for the global window one could do away with the
> (single-valued) hashmap. Sessions.mergeWindows() does the tumbling to
> figure out which windows to merge, so that'd be just as efficient.
> >>>>>>
> >>>>>> +1
> >>>>> Granted we should keep in mind that all of these further
> optimizations
> >>>>> probably pale in comparison to just getting rid of using
> >>>>> mergeAccumulators where we should be using addInput. And this is, in
> >>>>> the long run, dead code.
> >>>>>
> >>>>>
> >>>>>>> BTW, the code is broken because it hard codes SessionWindows
> rather than calling WindowFn.mergeWindows(...). This is a correctness, not
> just a performance, bug :(.
> >>>>>>>
> >>>>>>> Can you point me out in the code?
> >>>>>> E.g.
> https://github.com/apache/beam/blob/release-2.13.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L106
> always merges things that are intersecting, rather than querying
> WindowFn.mergeWindows to determine which, if any, should be merged.
> >>>>>>
> >>>>>>> On Thu, Jun 13, 2019 at 3:56 PM Jan Lukavský <[email protected]>
> wrote:
> >>>>>>>> Hi Robert,
> >>>>>>>>
> >>>>>>>> there is a comment around that which states, that the current
> solution should be more efficient. I'd say, that (for non-merging windows)
> it would be best to first explode windows, and only after that do
> combineByKey(key & window). Merging windows would have to be handled the
> way it is, or maybe it would be better to split this to
> >>>>>>>>
> >>>>>>>>     1) assign windows to elements
> >>>>>>>>
> >>>>>>>>     2) combineByKeyAndWindow
> >>>>>>>>
> >>>>>>>> Jan
> >>>>>>>>
> >>>>>>>> On 6/13/19 3:51 PM, Robert Bradshaw wrote:
> >>>>>>>>
> >>>>>>>> I think the problem is that it never leverages the (traditionally
> much cheaper) CombineFn.addInput(old_accumulator, new_value). Instead, it
> always calls CombineFn.mergeAccumulators(old_accumulator,
> CombineFn.addInput(CombineFn.createAccumulator(), new_value)). It should be
> feasible to fix this while still handling windowing correctly. (The
> end-of-window timestamp combiner could also be optimized because the
> timestamp need not be tracked throughout in that case.)
> >>>>>>>>
> >>>>>>>> On the other hand, once we move everything to portability, it's
> we'll probably toss all this code that use Spark's combiner lifting
> (instead using the GroupingCombiningTable that's implemented naively in
> Beam, as we do for Python to avoid fusion breaks).
> >>>>>>>>
> >>>>>>>> On Thu, Jun 13, 2019 at 3:20 PM Jan Lukavský <[email protected]>
> wrote:
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I have hit a performance issue with Spark runner, that seems to
> related
> >>>>>>>>> to its current Combine.perKey implementation. I'll try to
> summarize what
> >>>>>>>>> I have found in the code:
> >>>>>>>>>
> >>>>>>>>>      - Combine.perKey uses Spark's combineByKey primitive, which
> is pretty
> >>>>>>>>> similar to the definition of CombineFn
> >>>>>>>>>
> >>>>>>>>>      - it holds all elements as WindowedValues, and uses
> >>>>>>>>> Iterable<WindowedValue<Acc>> as accumulator (each WindowedValue
> holds
> >>>>>>>>> accumulated state for each window)
> >>>>>>>>>
> >>>>>>>>>      - the update function is implemented as
> >>>>>>>>>
> >>>>>>>>>       1) convert value to Iterable<WindowedValue<Acc>>
> >>>>>>>>>
> >>>>>>>>>       2) merge accumulators for each windows
> >>>>>>>>>
> >>>>>>>>> The logic inside createAccumulator and mergeAccumulators is quite
> >>>>>>>>> non-trivial. The result of profiling is that two frames where
> the code
> >>>>>>>>> spends most of the time are:
> >>>>>>>>>
> >>>>>>>>>      41633930798   33.18%     4163
> >>>>>>>>>
> org.apache.beam.runners.spark.translation.SparkKeyedCombineFn.mergeCombiners
> >>>>>>>>>      19990682441   15.93%     1999
> >>>>>>>>>
> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.unmodifiableIterable
> >>>>>>>>>
> >>>>>>>>> A simple change on code from
> >>>>>>>>>
> >>>>>>>>>      PCollection<..> input = ...
> >>>>>>>>>
> >>>>>>>>>      input.apply(Combine.perKey(...))
> >>>>>>>>>
> >>>>>>>>> to
> >>>>>>>>>
> >>>>>>>>>      PCollection<..> input = ...
> >>>>>>>>>
> >>>>>>>>>      input
> >>>>>>>>>
> >>>>>>>>>        .apply(GroupByKey.create())
> >>>>>>>>>
> >>>>>>>>>        .apply(Combine.groupedValues(...))
> >>>>>>>>>
> >>>>>>>>> had drastical impact on the job run time (minutes as opposed to
> hours,
> >>>>>>>>> after which the first job didn't even finish!).
> >>>>>>>>>
> >>>>>>>>> I think I understand the reason why the current logic is
> implemented as
> >>>>>>>>> it is, it has to deal with merging windows. But the consequences
> seem to
> >>>>>>>>> be that it renders the implementation very inefficient.
> >>>>>>>>>
> >>>>>>>>> Has anyone seen similar behavior? Does my analysis of the
> problem seem
> >>>>>>>>> correct?
> >>>>>>>>>
> >>>>>>>>> Jan
> >>>>>>>>>
> >>>>>>>>>
>

Reply via email to