On Nov 6, 2008, at 2:30 PM, Ricky Ho wrote:
Hmmm, sounds like the combiner is invoked after the map() process
completed for the file split.
No. The data path is complex, but the combiner is called when the map
outputs are being spilled to disk. So roughly, the map will output
key, value pairs until the io.sort.mb buffer is full, the contents are
sorted and fed to the combiner. The output of the combiner is written
to disk. When there are enough spills on disk, it will merge them
together, call the combiner, and write to disk. When the map finishes,
the final multi-level merge is done.
Since the reduce is also doing multi-level sort, it will also call the
combiner when a merge is done (other than the final merge, which is
fed into the reduce).
That means, before the combiner function starts, all the
intermediate map() output result will be kept in memory ? Any
comment on the memory footprint consumption ?
The memory is bound by io.sort.mb.
I think a sufficient condition is just to make sure the reduce task
will not COMPLETE before all the map tasks has completed. We don't
need to make sure the reduce task will not START before all maps
tasks has completed. This can be achieved easily by letting the
iterator.next() call within the reduce() method blocked.
*Sigh* no. The reduce function is invoked once per a unique key. The
reduce function is called in ascending order of keys. Since the final
map may return a's when previously you've only seen b's and c's. You
can't call the reduce with the b, you can't later call it with the a.
There is another potential issue in the reduce() API, can you
explain why do we need to expose the OutputCollector to the reduce()
method ? For example, is it possible that the "key" in the
output.collect() be a different key from the reduce method
parameter ? What happen if two reduce method (start with different
keys) writing their output on the same key ?
The reduce is allowed to have different input and output types. There
are *four* type parameters.
Reducer<KeyIn, ValueIn, KeyOut, ValueOut>
The output of the reduce is not resorted. If the reduce doesn't use
the same key as the input, the output of the reduce won't be sorted.
Duplicate keys on reduce output (either within the same reduce or
different ones, is not a problem for the framework.)
However, this requires some change of the current Reducer
interface. Currently the reduce() method is called once per key.
We want that to be called once per map result (within the same
key). What I mean is the following interface ...
There is a library that lets you run a chain of maps, if that is the
semantics you are looking for. For map/reduce, the sort is a very
fundamental piece. If you don't need sort between map and reduce, you
can set reduces = 0 and run much faster.
Does it make sense ?
Not really. Most map/reduce applications need the other semantics.
-- Owen