Hey All, I would like to bring up a discussion regarding the different reduce functionalities in the streaming API, because there are several ways they can (and cannot) be implemented efficiently which will work in totally different ways.
I will go through the different reduce types and their problems: *Simple reduce:* dataStream.reduce(ReduceFunction) The current working mechanism is that it combines the incoming elements using the reducefunction and emits the current reduced value. (so after every incoming, value there is one emit with the reduced). Currently if the reducer has higher parallelism than 1, the reduced value is only a 'local' reduce on the given vertex (there is no global reduce step). The problem here is that introducing a global reduce step would be equivalent to setting the reducer parallelism to 1, since we want to emit the current reduced value after each incoming data point. So the question here is if the current mechanism makes sense without a global aggregate or should we force parallelism 1. This is also the case for aggregation operators. *Simple Batch/Window reduce:* dataStream.window(1000).reduce(ReduceFunction) The current working mechanism is that it combines incoming elements with a ReduceFunction in windows/batches, currently also 'locally' on each parallel vertex and emitting one reduced output after each window. Here the same issue of global aggregation can be solved by introducing a global aggregator vertex with parallelism 1, which wouldnt cause a serious overhead if the windows are not too small. Another issue here is the assumptions we can make about the user-defined ReduceFunction. If we assume that the function is associative(we currently assume this) then the window reduce operators can be implemented to be almost as fast as simple reduces by storing pre-reduced groups of values. Do you think it is okay to make this assumption? *Batch/window groupreduce:* dataStream.window(1000).reduceGroup(GroupReduceFunction) The difference between .reduce and . groupReduce is that the user gets the window/batch as an iterable which can be quite useful in some cases. The problem here is the same as with the simple reduce, that is we couldnt figure out how to make global aggregations efficient. Unlike with window reduce where we can create a global aggregator vertex here that is impossible because the different working mechanics of the GroupReduce function (iterable input with custom output type). So even if we will make the window reduce global, the window groupreduce will have to remain local if we dont want to enforce a parallelism=1 bottleneck. This would make the API ambiguous. *Grouped reduces* dataStream.groupBy(keyPos).reduce(ReduceFunction) datastream.groupBy.(keyPos).window(1000).reduce/groupreduce Here we dont have the previous problems since local aggregations work as globals. So any ideas regarding this global/local reduce issue and reduce function associativity are appreciated :) Regards, Gyula
