Hey Fabian,

I have been thinking a bit a about your comments, let me give you my
thoughts on them:

Simple reduce:
>Not sure if there are use cases for multiple local reducers. I think I'd
>enforce DOP = 1 in this case.

When I started the discussion yesterday I thought pretty much the same, but
then I realized since we have custom partitioners between the task vertexes
the user can control which tuples go to which vertex. So local reducers
actually make a lot of sense. For instance I can implement a range
partitioner by some field then perform reduce locally. And actually this
argument goes for all the local stream reducers even on windows. So maybe
there should be an option for local/global.

Simple Batch/Window reduce:

>The associativity assumption for reduce functions holds. This also
>leveraged by the batch API which uses reduce functions for local aggregates
>(combiners) as well as for global aggregates (reduce). For windowed
>streaming you can do the same thing. Do local preaggregations on windows,
>emit the partial result once a window is complete, and reinitialize the
>partital aggragator (start with no initial state). The global aggregator
>eagerly reduces a preaggregate with the last full aggregate (no windowing).
>Since, preaggregators are reinitialized, the global aggregators does not
>need to hold individiual state for the preaggregates and only keeps the
>last full aggregate.

Yes I agree with the preaggregation approach. On a second thought this will
only make sense on windowed data streams where the window is defined by
system time. Anything else, for example batch (number of incoming records)
seems impossible to align if the datastream is not 100% balanced, which
will not happen if the partitioning is not shuffle. (so this is another
issue)

Batch/window groupreduce:
> You can do the same as for window.reduce if the GroupReduce function
>implements the combine interface (Combine functions must be
>associative!). In addition to the local preaggregates, the combine is also
>be used on the global aggregate to further reduce the state by combining
>the preaggregates. The reduce function is only called with a single value
>(the combined preaggregates) to make sure that the result is correct.
>If the GroupReduce function does not implement a combine interface, don't
>think that this can be done in a practical way (i.e., without caching the
>full stream).

Good call with the combine interface, I haven't thought about that! Same
issue with not-time windows as in the previous one.

Group reduces:
>I think you can play the preaggregation/combine tricks for windows here as
>well. So even in case of high data skew, you could do preaggregations for a
>single group with multiple combiners in parallel.

Good point, I havent thought about this, we assumed that the grouping would
give a balanced distribution but you are right.


I suggest that we don't include these global reducers in the upcoming
release, there is a lot of ways the api can be implemented it seems. I
think we should give it more thought, and experiment with the different
approaches first before we release it. What do you think about this?

Regards,
Gyula


On Thu, Sep 25, 2014 at 12:14 AM, Fabian Hueske <[email protected]> wrote:

> Hi, thanks for starting this discussion!
> I added some comments inline.
>
> 2014-09-24 11:43 GMT+02:00 Gyula Fóra <[email protected]>:
>
> > Hey All,
> >
> > I would like to bring up a discussion regarding the different reduce
> > functionalities in the streaming API, because there are several ways they
> > can (and cannot) be implemented efficiently which will work in totally
> > different ways.
> >
> > I will go through the different reduce types and their problems:
> >
> > *Simple reduce:*
> > dataStream.reduce(ReduceFunction)
> >
> > The current working mechanism is that it combines the incoming elements
> > using the reducefunction and emits the current reduced value. (so after
> > every incoming, value there is one emit with the reduced). Currently if
> the
> > reducer has higher parallelism than 1, the reduced value is only a
> 'local'
> > reduce on the given vertex (there is no global reduce step). The problem
> > here is that introducing a global reduce step would be equivalent to
> > setting the reducer parallelism to 1, since we want to emit the current
> > reduced value after each incoming data point. So the question here is if
> > the current mechanism makes sense without a global aggregate or should we
> > force parallelism 1.
> >
> >
> Not sure if there are use cases for multiple local reducers. I think I'd
> enforce DOP = 1 in this case.
>
>
> > This is also the case for aggregation operators.
> >
> >  *Simple Batch/Window reduce:*
> > dataStream.window(1000).reduce(ReduceFunction)
> >
> > The current working mechanism is that it combines incoming elements with
> a
> > ReduceFunction in windows/batches, currently also 'locally' on each
> > parallel vertex and emitting one reduced output after each window. Here
> the
> > same issue of global aggregation can be solved by introducing a global
> > aggregator vertex with parallelism 1, which wouldnt cause a serious
> > overhead if the windows are not too small.
> >
> > Another issue here is the assumptions we can make about the user-defined
> > ReduceFunction. If we assume that the function is associative(we
> currently
> > assume this) then the window reduce operators can be implemented to be
> > almost as fast as simple reduces by storing pre-reduced groups of values.
> > Do you think it is okay to make this assumption?
> >
> >
> The associativity assumption for reduce functions holds. This also
> leveraged by the batch API which uses reduce functions for local aggregates
> (combiners) as well as for global aggregates (reduce). For windowed
> streaming you can do the same thing. Do local preaggregations on windows,
> emit the partial result once a window is complete, and reinitialize the
> partital aggragator (start with no initial state). The global aggregator
> eagerly reduces a preaggregate with the last full aggregate (no windowing).
> Since, preaggregators are reinitialized, the global aggregators does not
> need to hold individiual state for the preaggregates and only keeps the
> last full aggregate.
>
>
> > *Batch/window groupreduce:*
> > dataStream.window(1000).reduceGroup(GroupReduceFunction)
> >
> > The difference between .reduce and . groupReduce is that the user gets
> the
> > window/batch as an iterable which can be quite useful in some cases. The
> > problem here is the same as with the simple reduce, that is we couldnt
> > figure out how to make global aggregations efficient. Unlike with window
> > reduce where we can create a global aggregator vertex here that is
> > impossible because the different working mechanics of the GroupReduce
> > function (iterable input with custom output type).
> >
> > So even if we will make the window reduce global, the window groupreduce
> > will have to remain local if we dont want to enforce a parallelism=1
> > bottleneck. This would make the API ambiguous.
> >
> >
> > You can do the same as for window.reduce if the GroupReduce function
> implements the combine interface (Combine functions must be
> associative!). In addition to the local preaggregates, the combine is also
> be used on the global aggregate to further reduce the state by combining
> the preaggregates. The reduce function is only called with a single value
> (the combined preaggregates) to make sure that the result is correct.
> If the GroupReduce function does not implement a combine interface, don't
> think that this can be done in a practical way (i.e., without caching the
> full stream).
>
>
>
> > *Grouped reduces*
> >
> > dataStream.groupBy(keyPos).reduce(ReduceFunction)
> > datastream.groupBy.(keyPos).window(1000).reduce/groupreduce
> >
> > Here we dont have the previous problems since local aggregations work as
> > globals.
> >
> >
> I think you can play the preaggregation/combine tricks for windows here as
> well. So even in case of high data skew, you could do preaggregations for a
> single group with multiple combiners in parallel.
>
>
> >
> > So any ideas regarding this global/local reduce issue and reduce function
> > associativity are appreciated :)
> >
> > Regards,
> > Gyula
>
>
> I hope I got everything right ;-)
>
> Cheers, Fabian
>

Reply via email to