Hi,
As a relatively new users of Hadoop, I am trying to construct an architectural 
picture about how Hadoop processes interact when performing a Map/Reduce job.  
I haven't find much documentation on it.  Can someone suggest any links or 
documentation that describe such detail ?
Here I am making some guess based on what I've seen from the API, Admin 
interface and what the simplest possible implementation could be.  I haven't 
looked at the source code and I know my guess is wrong because the 
implementation is simple, but unsophisticated.  I am trying to layout the 
ground for the expert who is familiar with the underlying implementation to 
correct me.
So here is my guess ... I appreciate if anyone can correct me with the actual 
implementation knowledge.

1.       To start the map/reduce cluster, run the "start-mapred.sh" script, 
which based on the "hadoop-site.xml" file, starts (via SSH) one "JobTracker" 
process and multiple "TaskTracker" processes across multiple machines.  (Of 
course, here I assume the HDFS is already started).

2.       Now the client program submit a Map/Reduce job by creating a "JobConf" 
object and invoking "JobClient.runJob(jobConf)", this API will upload the 
Mapper and Reducer implementation classes, as well as the job config to the 
JobTracker daemon.  (at this moment, I expect the directory of the input path 
is freezed.  In other words, no new files can be created or removed from the 
input path).

3.       After assigning a unique Job id, the JobTracker look at the 
"jobConf.inputPath" to determine how many TaskTrackers are needed for the 
mapper phase, based on the number of files in the input path, as well as how 
busy existing "TaskTrackers" are.  Then it will select a number of 
"Map-Phase-TaskTrackers" who is relatively idle as well as physically close to 
the HDFS that host a copy (e.g. on the same machine or same rack).

4.       Based on some scheduling algorithm, the JobTracker determines multiple 
files (from the input path) to assign to each selected Map-Phase-TaskTracker.  
It sends to each TaskTracker the job id, the mapper implementation class 
bytecode, as well as the name of the assigned input files.

5.       Each Map-Phase-TaskTracker process will spawn multiple threads, one 
for each assigned input file.  (so there is a 1 to 1 correspondence between 
threads and files).  The TaskTracker also monitor the progress of each thread 
associated with this specific job id.  The map phase is now started ...

a.       Each thread will start to read the assigned input file "sequentially", 
one record at a time using the "InputFormat class" specified in the jobConf.

b.      For each record it read, it invoke the uploaded Mapper implementation 
class's map() method.  Whenever the "output.collect(key, value)" method is 
called, a record is added to an in-memory mapResultHashtable.  This step is 
repeated until the whole input file is consumed (EOF is true).

c.       Then the thread inspect the mapResultHashtable.  For each key, it 
invoke the uploaded combiner class's reduce() method.  Whenever the 
"output.collect(key, value)" method is called, a record is added to an 
in-memory combineResultHashtable. Then the thread persist the 
combineResultHashtable into a local file (not HDFS).  Finally the thread quits.

6.       When all the threads associated with the Job ID quits, the 
Map-Phase-TaskTracker send a "map_complete" notification to the JobTracker.

7.       When the JobTracker receives all the "map_complete" notifications, it 
know the map phases is completed.  Now it is time to start the "reduce" phase.

8.       The JobTracker look at the "jobConf.numReduceTask" to determine how 
many Reduce-phase-TaskTrackers are needed.  It will also selected (randomly) 
those Reduce-Phase-TaskTrackers.  For each of them, the jobTracker will send 
the job id as well as the reducer implementation class bytecode.

9.       Now, for each of the previous Map-phase-TaskTracker, the JobTracker 
send a "reduce_ready" message, as well as an array of addresses of the 
Reduce-Phase-TaskTrackers.  Each Map-Phase-TaskTracker will start a thread.

10.   The thread iterate the persisted combineResultHashtable.  For each key, 
it invoke the partitioner class's getPartition() function to determine the 
partition number and then the address of the Reduce-PhaseTaskTracker. And then 
open a network connection to the Reduce-Phase-TaskTracker and pass along the 
key and values.

11.   At each Reduce-Phase-TaskTracker, for each new key received, it will 
spawn a new thread.  This thread will invoke the Reducer.reduce() method.  (so 
there is a 1 to 1 correspondence between threads and unique keys).  The 
Reduce-Phase-TaskTracker also monitor the progress of each thread associated 
with this specific job id.

a.       Within the reduce() method, whenever the "output.collect(key, value)" 
method is called, a record is written using the "OutputFormat class" specified 
in the jobConf.

b.      When the reduce() method returns, the thread quits.

12.   When all the reducer threads associated with the Job ID completes, the 
TaskTracker send a "reduce_complete" notification to the JobTracker.

13.   When the JobTracker receives all the "reduce_complete" notifications, it 
know the whole job is completed.
Rgds, Ricky

Reply via email to