A Case for Stronger Partial Aggregation Semantics in Hadoop The current combiner implementation provides the ability to reduce the amount of required network bandwidth, however, its semantics are weak. With strong semantics for a combiner-like operation, greater performance improvements would be possible.
As currently implemented, a single map call may create zero or more output key-value pairs via a collector. A reducer call, receives as input a key and a list of values, and must output zero or more key-value pairs via a collector. If defined, a combiner receives as input a key and list of values with that key from a map output spill. The idea is that the combiner reduces all of these values for the particular key to one output key-value pair, thus reducing the total amount of data that must be transferred to a reducer. The combiner semantics, however, are the same as the reducer’s and there is nothing to prevent a programmer from implementing a combiner that changes the value of the key or outputs more or less than one key-value pair. This leads to a number of limitations, chief among them the fact that the combiner cannot be applied more than once because there are no guarantees regarding the effects of repeatedly using the combiner (as implemented, the combiner could produce more than one output pair or change the key). A summary of desirable semantics: 1 The map function produces as output partial aggregate values representing singletons. 2 A new combiner function that explicitly performs partial to partial aggregation over one or more values, creating one new output value of the same type as the input value and not changing the key. 3 A reducer which takes as input partial aggregates and produces final values of any format. This is very similar to what a database requires of user defined aggregates. For example, in Informix ( http://publib.boulder.ibm.com/infocenter/idshelp/v10/index.jsp?topic=/com.ibm.udr.doc/udr126.htm. ) the user must specify four functions: ‘init’, ‘iter’, ‘combine’, and ‘final’. In Hadoop, ‘init’ and ‘iter’ are already performed by the map. The ‘combine’ function would be implemented by the new combiner function. Finally, reduce performs the ‘final’ aggregation. This proposal requires a slightly more restrictive combiner, but with the ability to apply this new combiner function repeatedly, one can obtain some benefits, including: 1 Rather than just combining within a mapper’s output spill, one could repeat the process during the merge of spills, further reducing the amount of data to be transferred. 2 The reducer can be more aggressively pipelined with partial aggregation occurring among the finished map outputs while the reducer waits for later map tasks to complete. In this manner, some of the aggregation can be pushed into the sort and merge phases. A potential contract with the user for a more explicit partial to partial aggregate function would simply do away with the collector interface, pushing any writing logic to the calling function: Writable partialToPartial(Iterator values, Reporter reporter){… } The user must return one and only one value. The key could also be provided as a read only parameter. As with the current combiner, a user need not provide a partial-to-partial aggregation function. It is only an optimization. I am currently working to get some performance numbers related to pushing aggregation into the copying and merging phase of reduce. I welcome any thoughts regarding this idea. John Cieslewicz