A Case for Stronger Partial Aggregation Semantics in Hadoop

The current combiner implementation provides the ability to reduce the
amount of required network bandwidth, however, its semantics are weak. With
strong semantics for a combiner-like operation, greater performance
improvements would be possible.

As currently implemented, a single map call may create zero or more output
key-value pairs via a collector. A reducer call, receives as input a key
and a list of values, and must output zero or more key-value pairs via a
collector. If defined, a combiner receives as input a key and list of
values with that key from a map output spill. The idea is that the combiner
reduces all of these values for the particular key to one output key-value
pair, thus reducing the total amount of data that must be transferred to a
reducer. The combiner semantics, however, are the same as the reducer’s and
there is nothing to prevent a programmer from implementing a combiner that
changes the value of the key or outputs more or less than one key-value
pair.

This leads to a number of limitations, chief among them the fact that the
combiner cannot be applied more than once because there are no guarantees
regarding the effects of repeatedly using the combiner (as implemented, the
combiner could produce more than one output pair or change the key).

A summary of desirable semantics:
   1 The map function produces as output partial aggregate values
      representing singletons.
   2 A new combiner function that explicitly performs partial to partial
      aggregation over one or more values, creating one new output value of
      the same type as the input value and not changing the key.
   3 A reducer which takes as input partial aggregates and produces final
      values of any format.

This is very similar to what a database requires of user defined
aggregates. For example, in Informix (
http://publib.boulder.ibm.com/infocenter/idshelp/v10/index.jsp?topic=/com.ibm.udr.doc/udr126.htm.
) the user must specify four functions: ‘init’, ‘iter’, ‘combine’, and
‘final’. In Hadoop, ‘init’ and ‘iter’ are already performed by the map. The
‘combine’ function would be implemented by the new combiner function.
Finally, reduce performs the ‘final’ aggregation.

This proposal requires a slightly more restrictive combiner, but with the
ability to apply this new combiner function repeatedly, one can obtain some
benefits, including:
   1 Rather than just combining within a mapper’s output spill, one could
      repeat the process during the merge of spills, further reducing the
      amount of data to be transferred.
   2 The reducer can be more aggressively pipelined with partial
      aggregation occurring among the finished map outputs while the
      reducer waits for later map tasks to complete. In this manner, some
      of the aggregation can be pushed into the sort and merge phases.

A potential contract with the user for a more explicit partial to partial
aggregate function would simply do away with the collector interface,
pushing any writing logic to the calling function:
Writable partialToPartial(Iterator values, Reporter reporter){… }
The user must return one and only one value. The key could also be provided
as a read only parameter.

As with the current combiner, a user need not provide a partial-to-partial
aggregation function. It is only an optimization.

I am currently working to get some performance numbers related to pushing
aggregation into the copying and merging phase of reduce. I welcome any
thoughts regarding this idea.

John Cieslewicz

Reply via email to