Hey All,

I would like to bring up a discussion regarding the different reduce
functionalities in the streaming API, because there are several ways they
can (and cannot) be implemented efficiently which will work in totally
different ways.

I will go through the different reduce types and their problems:

*Simple reduce:*
dataStream.reduce(ReduceFunction)

The current working mechanism is that it combines the incoming elements
using the reducefunction and emits the current reduced value. (so after
every incoming, value there is one emit with the reduced). Currently if the
reducer has higher parallelism than 1, the reduced value is only a 'local'
reduce on the given vertex (there is no global reduce step). The problem
here is that introducing a global reduce step would be equivalent to
setting the reducer parallelism to 1, since we want to emit the current
reduced value after each incoming data point. So the question here is if
the current mechanism makes sense without a global aggregate or should we
force parallelism 1.

This is also the case for aggregation operators.

 *Simple Batch/Window reduce:*
dataStream.window(1000).reduce(ReduceFunction)

The current working mechanism is that it combines incoming elements with a
ReduceFunction in windows/batches, currently also 'locally' on each
parallel vertex and emitting one reduced output after each window. Here the
same issue of global aggregation can be solved by introducing a global
aggregator vertex with parallelism 1, which wouldnt cause a serious
overhead if the windows are not too small.

Another issue here is the assumptions we can make about the user-defined
ReduceFunction. If we assume that the function is associative(we currently
assume this) then the window reduce operators can be implemented to be
almost as fast as simple reduces by storing pre-reduced groups of values.
Do you think it is okay to make this assumption?

*Batch/window groupreduce:*
dataStream.window(1000).reduceGroup(GroupReduceFunction)

The difference between .reduce and . groupReduce is that the user gets the
window/batch as an iterable which can be quite useful in some cases. The
problem here is the same as with the simple reduce, that is we couldnt
figure out how to make global aggregations efficient. Unlike with window
reduce where we can create a global aggregator vertex here that is
impossible because the different working mechanics of the GroupReduce
function (iterable input with custom output type).

So even if we will make the window reduce global, the window groupreduce
will have to remain local if we dont want to enforce a parallelism=1
bottleneck. This would make the API ambiguous.


*Grouped reduces*

dataStream.groupBy(keyPos).reduce(ReduceFunction)
datastream.groupBy.(keyPos).window(1000).reduce/groupreduce

Here we dont have the previous problems since local aggregations work as
globals.


So any ideas regarding this global/local reduce issue and reduce function
associativity are appreciated :)

Regards,
Gyula

Reply via email to