On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon <atdi...@gmail.com> wrote:
>
> Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and meant 
> to hit user@, but as we're here could you clarify your two points, however--

No problem. This is veering into dev@ territory anyway :).

> 1) I am under the impression that the 4,000 sliding windows approach (30 days 
> every 10m) will re-evaluate my combine aggregation every 10m whereas with the 
> two-window approach my Combine aggregation would evolve iteratively, only 
> merging new results into the aggregation.
>
> If there's a cross-window optimization occurring that would allow iterative 
> combining _across windows_, given the substantial order of magnitude 
> difference in scale at play, is it safe to consider such 'internal 
> optimization detail' part of the platform contract (Dataflow's, say)? 
> Otherwise it would be hard to lean on this from a production system that will 
> live into the future.

OK, let's first define exactly what (I think) you're trying to
compute. Let's label windows by their (upper) endpoint. So, every 10
minutes you have a window W_t and an aggregate Aggr(e for e in
all_events if t - 60days <= timestamp(e) < t).

The way this is computed in Beam is by storing a map W_t ->
RunningAggregate and whenever we see an element with timestamp T we
assign it to the set of windows S = {W_t : T in W_t} (in this case
there would be 30*24*6 = 4320 of them) and subsequently update all the
running aggregates. When we are sure we've seen all elements up to t
(the watermark) we release window W_t with its computed aggregate
downstream.

An alternative that's often proposed, and works only for aligned
sliding windows, is to instead store a map of 10-minute buckets to
running aggregates, and whenever an element comes in we add its value
to the aggregate of that bucket. This side is cheaper, but every time
the watermark tells us we're able to release a window we then have to
compute an Aggregate over all 4000 of these buckets.

A further extension, if the aggregation function is reversible, is to
keep a running total, and every time we release a window, we add the
next bucket's contribution, and remove the previous buckets
contribution, to this running total. If the computation is not
reversible, we can compute a "binary tree" of aggregates (e.g. 10-min
buckets, 20-min buckets, 40-min buckets, ...) and perform log(N)
aggregations each time and element comes in and log(N) every time a
window is released.

Each of these is more specialized to the exact shape of sliding
windows, and though they may allow us to save some compute, still
require the storage of 4320 (or even 4320 log(N)) bits of state. Often
the compute is virtually free compared to the cost of reading/writing
state, which means there are less (or even no) advantages for the
specialized methods (depending on how sparse the data is), especially
if one does blind writes and only has to absorb latency on reads.

As mentioned, Beam does the first, and though we've looked at the
others there has not been a compelling case made that they would
actually help much if at all in practice.


As for what would be computed with 60/30-day sliding windows with 10
minute triggers, the output would be (approximately, due to mixing
processing and event time, though at this granularity they are likely
to line up mostly), W_t_i where t jumps in 30 day increments and i
jumps in 10 minute increments and the aggregate assigned to W_t_i is
Aggr(e for e in all_events where t-60days < timestamp(e) < i) which is
not quite the same thing. E.g. let's say we have a single event
(aggregation count) every 10 minute interval. The sliding windows
output would give 4320 evens for every 30-day window, published every
10 minutes. The 60/30-day would re-publish each of the windows [0,
60day), [30day, 90day), [60day, 120day), ... with the values 1, 2, 3,
4, ..., 4320. In other words, at time t=45day, the window  [0, 60day)
would trigger with a value of the 6480 values that had fallen in this
window so far, and the [30day, 90day) window would trigger with the
2160 seen so far, neither of which are accurate representations of
"from now until 30 days ago."

I suppose one could avoid doing any pre-aggregation, and emit all of
the events (with reified timestamp) in 60/30-day windows, then have a
DoFn that filters on the events and computes each of the 10-minute
aggregates over the "true" sliding window (4320 outputs). This could
be cheaper if your events are very sparse, will be more expensive if
they're very dense, and it's unclear what the tradeoff will be.

I would try the straightforward approach and see if that works,
because it just well might be good enough that the additional
complexity isn't worth it (if even a measurable improvement). You can
play with various windowing/triggering strategies at
https://window-explorer.appspot.com/ (though unfortunately processing
time triggers are not represented).

> 2) When you say "regardless of the how the problem is structured" there are 
> 4,000 stored 'sub-aggregations', even in the two-window approach--why is that 
> so? Isn't the volume of panes produced by a trigger a function of what keys 
> have actually received new values *in the window*?

True, if most 10-minute intervals that have no event then there are
further optimizations one can do.

> Thanks for help in understanding these details. I want to make good use of 
> Beam and hope to contribute back at some point (docs/writing etc), once I can 
> come to terms with all of these pieces.
>
> On 2019/10/29 20:39:18, Robert Bradshaw <rober...@google.com> wrote:
> > No matter how the problem is structured, computing 30 day aggregations
> > for every 10 minute window requires storing at least 30day/10min =
> > ~4000 sub-aggregations. In Beam, the elements themselves are not
> > stored in every window, only the intermediate aggregates.
> >
> > I second Luke's suggestion to try it out and see if this is indeed a
> > prohibitive bottleneck.
> >
> > On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik <lc...@google.com> wrote:
> > >
> > > You should first try the obvious answer of using a sliding window of 30 
> > > days every 10 minutes before you try the 60 days every 30 days.
> > > Beam has some optimizations which will assign a value to multiple windows 
> > > and only process that value once even if its in many windows. If that 
> > > doesn't perform well, then come back to dev@ and look to optimize.
> > >
> > > On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon <atdi...@gmail.com> wrote:
> > >>
> > >> Hi I am new to Beam.
> > >>
> > >> I would like to accumulate data over 30 day period and perform a running 
> > >> aggregation over this data, say every 10 minutes.
> > >>
> > >> I could use a sliding window of 30 days every 10 minutes (triggering at 
> > >> end of window) but this seems grossly inefficient (both in terms of # of 
> > >> windows at play and # of events duplicated across these windows).
> > >>
> > >> A more efficient strategy seems to be to use a sliding window of 60 days 
> > >> every 30 days -- triggering every 10 minutes -- so that I'm guaranteed 
> > >> to have 30 days worth of data aggregated/combined in at least one of the 
> > >> 2 at-play sliding windows.
> > >>
> > >> The last piece of this puzzle however would be to do a final global 
> > >> aggregation over only the keys from the latest trigger of the earlier 
> > >> sliding window.
> > >>
> > >> But Beam does not seem to offer a way to orchestrate this. Even though 
> > >> this seems like it would be a pretty common or fundamental ask.
> > >>
> > >> One thought I had was to re-window in a way that would isolate keys 
> > >> triggered at the same time, in the same window but I don't see any 
> > >> contracts from Beam that would allow an approach like that.
> > >>
> > >> What am I missing?
> > >>
> > >>
> >

Reply via email to