I added multi-threading to the map phase of the LocalRunner. The code is in 
the attached patch.

What I also noticed during my experiments is that I have enough load to easily 
fill 8 cores, but my code should be IO-bound. I have the feeling that the 
SequenceFile or the framework wastes cpu cycles somewhere.

Thorsten

On Saturday 18 August 2007, Thorsten Schuett wrote:
> Hi,
>
> first of all, thanks for Hadoop. It's amazing how much you can get done
> with a small hadoop job.
>
> My setup is a little bit different from the usual. I have a mid-sized
> Opteron machine with the data resting on a local raid. I configured
> LocalFileSystem and 2 map + 2 reduce tasks per core.
>
> During the reduce phase I see rather slow copy values in the webinterface
> and <50% cpu usage in total. vmstat shows that hadoop  constantly reads
> ~10-20MB/s and writes in short bursts with higher speeds (>100MB/s).
> Neither the disks nor the cpus seem to be the bottleneck.
>
> What's interesting though, is the traffic on the loopback device. There is
> constant traffic in the same order as the read rate mentioned above. Please
> correct me if I am wrong, but it looks like hadoop is using the rpc
> mechanism to copy the map output files to the reduce task (in this case via
> the loopback device). If my assumptions are correct, would it be possible
> to read/access the files directly in the "one-node mode"?
>
> Thanks,
>   Thorsten
Index: src/java/org/apache/hadoop/mapred/LocalJobRunner.java
===================================================================
--- src/java/org/apache/hadoop/mapred/LocalJobRunner.java	(Revision 567691)
+++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java	(Arbeitskopie)
@@ -20,9 +20,16 @@
 
 import java.io.IOException;
 import java.text.NumberFormat;
+import java.util.List;
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.HashMap;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,7 +46,8 @@
   private FileSystem fs;
   private HashMap<String, Job> jobs = new HashMap<String, Job>();
   private JobConf conf;
-  private int map_tasks = 0;
+  private AtomicInteger map_tasks =  new AtomicInteger();
+  private AtomicReference mapFailed = new AtomicReference();
   private int reduce_tasks = 0;
 
   private JobTrackerMetrics myMetrics = null;
@@ -47,7 +55,35 @@
   public long getProtocolVersion(String protocol, long clientVersion) {
     return JobSubmissionProtocol.versionID;
   }
-  
+
+  private class MapTaskRunner implements Runnable{
+    private MapTask map;
+    private JobConf localConf;
+    private Job umbilical;
+
+    public MapTaskRunner(MapTask map, JobConf localConf, Job umbilical){
+      this.map = map;
+      this.localConf = localConf;
+      this.umbilical = umbilical;
+    }
+
+    public void run(){
+      try{
+	map_tasks.incrementAndGet();
+	myMetrics.launchMap();
+	map.run(localConf, umbilical);
+	myMetrics.completeMap();
+	map_tasks.decrementAndGet();
+	map.saveTaskOutput();
+	umbilical.updateCounters(map);
+      }catch(Throwable e){
+	System.out.println(e);
+	e.printStackTrace();
+	mapFailed.compareAndSet(null, e);
+      }
+    }
+  }
+
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
     private Path file;
@@ -61,6 +97,8 @@
     private JobProfile profile;
     private Path localFile;
     private FileSystem localFs;
+    private Map<String, Float> progressReports = new HashMap<String, Float>();
+    private float totalProgress = 0.0f;
     
     // Counters summed over all the map/reduce tasks which
     // have successfully completed
@@ -101,7 +139,7 @@
       try {
         // split input into minimum number of splits
         InputSplit[] splits;
-        splits = job.getInputFormat().getSplits(job, 1);
+        splits = job.getInputFormat().getSplits(job, conf.getInt("mapred.tasktracker.tasks.maximum", 2));
         String jobId = profile.getJobId();
         
         int numReduceTasks = job.getNumReduceTasks();
@@ -110,7 +148,11 @@
           numReduceTasks = 1;
           job.setNumReduceTasks(1);
         }
+
         DataOutputBuffer buffer = new DataOutputBuffer();
+	ExecutorService pool = Executors.newFixedThreadPool(conf.getInt("mapred.tasktracker.tasks.maximum", 2));
+	Future<?>[] futures = new Future<?>[splits.length];
+	LOG.info("map tasks: " + splits.length);
         for (int i = 0; i < splits.length; i++) {
           String mapId = "map_" + idFormat.format(i); 
           mapIds.add(mapId);
@@ -125,14 +167,29 @@
           JobConf localConf = new JobConf(job);
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
-          map_tasks += 1;
-          myMetrics.launchMap();
-          map.run(localConf, this);
-          map.saveTaskOutput();
-          myMetrics.completeMap();
-          map_tasks -= 1;
-          updateCounters(map);
+          //map_tasks += 1;
+          //myMetrics.launchMap();
+	  progressReports.put(mapId, 0.0f);
+	  futures[i] = pool.submit(new MapTaskRunner(map, localConf, this));
+          //map.run(localConf, this);
+          //map.saveTaskOutput();
+          //myMetrics.completeMap();
+          //map_tasks -= 1;
+          //updateCounters(map);
         }
+
+	boolean finished = false;
+	/* wait for execution finished or mapFailed */
+	while(!finished){
+	    Thread.sleep(5000);
+	    if(mapFailed.get() != null)
+	      throw (Throwable)mapFailed.get();
+	    finished = true;
+	    for(int i = 0; i < futures.length; i++)
+		finished = finished && futures[i].isDone();
+	}
+	pool.shutdown();
+	
         String reduceId = "reduce_" + newId();
         try {
           if (numReduceTasks > 0) {
@@ -205,11 +262,14 @@
 
     public boolean progress(String taskId, float progress, String state, 
                          TaskStatus.Phase phase, Counters taskCounters) {
-      LOG.info(state);
+	//LOG.info(state);
       float taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
         float numTasks = mapIds.size();
-        status.setMapProgress(taskIndex/numTasks + progress/numTasks);
+	totalProgress = totalProgress - progressReports.get(taskId) + progress / mapIds.size();
+	progressReports.put(taskId, progress / mapIds.size());
+        //status.setMapProgress(taskIndex/numTasks + progress/numTasks);
+        status.setMapProgress(totalProgress);
       } else {
         status.setReduceProgress(progress);
       }
@@ -240,7 +300,10 @@
     public void done(String taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
-        status.setMapProgress(1.0f);
+	totalProgress = totalProgress - progressReports.get(taskId) + 1.0f / mapIds.size();
+	progressReports.put(taskId, 1.0f / mapIds.size());
+        status.setMapProgress(totalProgress);
+        //status.setMapProgress(status.mapProgress() + 1.0f / mapIds.size());
       } else {
         status.setReduceProgress(1.0f);
       }
@@ -306,7 +369,7 @@
   }
   
   public ClusterStatus getClusterStatus() {
-    return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
+    return new ClusterStatus(1, map_tasks.get(), reduce_tasks, 1);
   }
 
   public JobStatus[] jobsToComplete() {return null;}

Reply via email to