Hi, As a relatively new users of Hadoop, I am trying to construct an architectural picture about how Hadoop processes interact when performing a Map/Reduce job. I haven't find much documentation on it. Can someone suggest any links or documentation that describe such detail ? Here I am making some guess based on what I've seen from the API, Admin interface and what the simplest possible implementation could be. I haven't looked at the source code and I know my guess is wrong because the implementation is simple, but unsophisticated. I am trying to layout the ground for the expert who is familiar with the underlying implementation to correct me. So here is my guess ... I appreciate if anyone can correct me with the actual implementation knowledge.
1. To start the map/reduce cluster, run the "start-mapred.sh" script, which based on the "hadoop-site.xml" file, starts (via SSH) one "JobTracker" process and multiple "TaskTracker" processes across multiple machines. (Of course, here I assume the HDFS is already started). 2. Now the client program submit a Map/Reduce job by creating a "JobConf" object and invoking "JobClient.runJob(jobConf)", this API will upload the Mapper and Reducer implementation classes, as well as the job config to the JobTracker daemon. (at this moment, I expect the directory of the input path is freezed. In other words, no new files can be created or removed from the input path). 3. After assigning a unique Job id, the JobTracker look at the "jobConf.inputPath" to determine how many TaskTrackers are needed for the mapper phase, based on the number of files in the input path, as well as how busy existing "TaskTrackers" are. Then it will select a number of "Map-Phase-TaskTrackers" who is relatively idle as well as physically close to the HDFS that host a copy (e.g. on the same machine or same rack). 4. Based on some scheduling algorithm, the JobTracker determines multiple files (from the input path) to assign to each selected Map-Phase-TaskTracker. It sends to each TaskTracker the job id, the mapper implementation class bytecode, as well as the name of the assigned input files. 5. Each Map-Phase-TaskTracker process will spawn multiple threads, one for each assigned input file. (so there is a 1 to 1 correspondence between threads and files). The TaskTracker also monitor the progress of each thread associated with this specific job id. The map phase is now started ... a. Each thread will start to read the assigned input file "sequentially", one record at a time using the "InputFormat class" specified in the jobConf. b. For each record it read, it invoke the uploaded Mapper implementation class's map() method. Whenever the "output.collect(key, value)" method is called, a record is added to an in-memory mapResultHashtable. This step is repeated until the whole input file is consumed (EOF is true). c. Then the thread inspect the mapResultHashtable. For each key, it invoke the uploaded combiner class's reduce() method. Whenever the "output.collect(key, value)" method is called, a record is added to an in-memory combineResultHashtable. Then the thread persist the combineResultHashtable into a local file (not HDFS). Finally the thread quits. 6. When all the threads associated with the Job ID quits, the Map-Phase-TaskTracker send a "map_complete" notification to the JobTracker. 7. When the JobTracker receives all the "map_complete" notifications, it know the map phases is completed. Now it is time to start the "reduce" phase. 8. The JobTracker look at the "jobConf.numReduceTask" to determine how many Reduce-phase-TaskTrackers are needed. It will also selected (randomly) those Reduce-Phase-TaskTrackers. For each of them, the jobTracker will send the job id as well as the reducer implementation class bytecode. 9. Now, for each of the previous Map-phase-TaskTracker, the JobTracker send a "reduce_ready" message, as well as an array of addresses of the Reduce-Phase-TaskTrackers. Each Map-Phase-TaskTracker will start a thread. 10. The thread iterate the persisted combineResultHashtable. For each key, it invoke the partitioner class's getPartition() function to determine the partition number and then the address of the Reduce-PhaseTaskTracker. And then open a network connection to the Reduce-Phase-TaskTracker and pass along the key and values. 11. At each Reduce-Phase-TaskTracker, for each new key received, it will spawn a new thread. This thread will invoke the Reducer.reduce() method. (so there is a 1 to 1 correspondence between threads and unique keys). The Reduce-Phase-TaskTracker also monitor the progress of each thread associated with this specific job id. a. Within the reduce() method, whenever the "output.collect(key, value)" method is called, a record is written using the "OutputFormat class" specified in the jobConf. b. When the reduce() method returns, the thread quits. 12. When all the reducer threads associated with the Job ID completes, the TaskTracker send a "reduce_complete" notification to the JobTracker. 13. When the JobTracker receives all the "reduce_complete" notifications, it know the whole job is completed. Rgds, Ricky
