On Nov 6, 2008, at 2:30 PM, Ricky Ho wrote:

Hmmm, sounds like the combiner is invoked after the map() process completed for the file split.

No. The data path is complex, but the combiner is called when the map outputs are being spilled to disk. So roughly, the map will output key, value pairs until the io.sort.mb buffer is full, the contents are sorted and fed to the combiner. The output of the combiner is written to disk. When there are enough spills on disk, it will merge them together, call the combiner, and write to disk. When the map finishes, the final multi-level merge is done.

Since the reduce is also doing multi-level sort, it will also call the combiner when a merge is done (other than the final merge, which is fed into the reduce).

That means, before the combiner function starts, all the intermediate map() output result will be kept in memory ? Any comment on the memory footprint consumption ?

The memory is bound by io.sort.mb.

I think a sufficient condition is just to make sure the reduce task will not COMPLETE before all the map tasks has completed. We don't need to make sure the reduce task will not START before all maps tasks has completed. This can be achieved easily by letting the iterator.next() call within the reduce() method blocked.

*Sigh* no. The reduce function is invoked once per a unique key. The reduce function is called in ascending order of keys. Since the final map may return a's when previously you've only seen b's and c's. You can't call the reduce with the b, you can't later call it with the a.

There is another potential issue in the reduce() API, can you explain why do we need to expose the OutputCollector to the reduce() method ? For example, is it possible that the "key" in the output.collect() be a different key from the reduce method parameter ? What happen if two reduce method (start with different keys) writing their output on the same key ?

The reduce is allowed to have different input and output types. There are *four* type parameters.

Reducer<KeyIn, ValueIn, KeyOut, ValueOut>

The output of the reduce is not resorted. If the reduce doesn't use the same key as the input, the output of the reduce won't be sorted. Duplicate keys on reduce output (either within the same reduce or different ones, is not a problem for the framework.)

However, this requires some change of the current Reducer interface. Currently the reduce() method is called once per key. We want that to be called once per map result (within the same key). What I mean is the following interface ...

There is a library that lets you run a chain of maps, if that is the semantics you are looking for. For map/reduce, the sort is a very fundamental piece. If you don't need sort between map and reduce, you can set reduces = 0 and run much faster.

Does it make sense ?

Not really. Most map/reduce applications need the other semantics.

-- Owen

Reply via email to