I added multi-threading to the map phase of the LocalRunner. The code is in
the attached patch.
What I also noticed during my experiments is that I have enough load to easily
fill 8 cores, but my code should be IO-bound. I have the feeling that the
SequenceFile or the framework wastes cpu cycles somewhere.
Thorsten
On Saturday 18 August 2007, Thorsten Schuett wrote:
> Hi,
>
> first of all, thanks for Hadoop. It's amazing how much you can get done
> with a small hadoop job.
>
> My setup is a little bit different from the usual. I have a mid-sized
> Opteron machine with the data resting on a local raid. I configured
> LocalFileSystem and 2 map + 2 reduce tasks per core.
>
> During the reduce phase I see rather slow copy values in the webinterface
> and <50% cpu usage in total. vmstat shows that hadoop constantly reads
> ~10-20MB/s and writes in short bursts with higher speeds (>100MB/s).
> Neither the disks nor the cpus seem to be the bottleneck.
>
> What's interesting though, is the traffic on the loopback device. There is
> constant traffic in the same order as the read rate mentioned above. Please
> correct me if I am wrong, but it looks like hadoop is using the rpc
> mechanism to copy the map output files to the reduce task (in this case via
> the loopback device). If my assumptions are correct, would it be possible
> to read/access the files directly in the "one-node mode"?
>
> Thanks,
> Thorsten
Index: src/java/org/apache/hadoop/mapred/LocalJobRunner.java
===================================================================
--- src/java/org/apache/hadoop/mapred/LocalJobRunner.java (Revision 567691)
+++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java (Arbeitskopie)
@@ -20,9 +20,16 @@
import java.io.IOException;
import java.text.NumberFormat;
+import java.util.List;
import java.util.ArrayList;
+import java.util.Map;
import java.util.HashMap;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,7 +46,8 @@
private FileSystem fs;
private HashMap<String, Job> jobs = new HashMap<String, Job>();
private JobConf conf;
- private int map_tasks = 0;
+ private AtomicInteger map_tasks = new AtomicInteger();
+ private AtomicReference mapFailed = new AtomicReference();
private int reduce_tasks = 0;
private JobTrackerMetrics myMetrics = null;
@@ -47,7 +55,35 @@
public long getProtocolVersion(String protocol, long clientVersion) {
return JobSubmissionProtocol.versionID;
}
-
+
+ private class MapTaskRunner implements Runnable{
+ private MapTask map;
+ private JobConf localConf;
+ private Job umbilical;
+
+ public MapTaskRunner(MapTask map, JobConf localConf, Job umbilical){
+ this.map = map;
+ this.localConf = localConf;
+ this.umbilical = umbilical;
+ }
+
+ public void run(){
+ try{
+ map_tasks.incrementAndGet();
+ myMetrics.launchMap();
+ map.run(localConf, umbilical);
+ myMetrics.completeMap();
+ map_tasks.decrementAndGet();
+ map.saveTaskOutput();
+ umbilical.updateCounters(map);
+ }catch(Throwable e){
+ System.out.println(e);
+ e.printStackTrace();
+ mapFailed.compareAndSet(null, e);
+ }
+ }
+ }
+
private class Job extends Thread
implements TaskUmbilicalProtocol {
private Path file;
@@ -61,6 +97,8 @@
private JobProfile profile;
private Path localFile;
private FileSystem localFs;
+ private Map<String, Float> progressReports = new HashMap<String, Float>();
+ private float totalProgress = 0.0f;
// Counters summed over all the map/reduce tasks which
// have successfully completed
@@ -101,7 +139,7 @@
try {
// split input into minimum number of splits
InputSplit[] splits;
- splits = job.getInputFormat().getSplits(job, 1);
+ splits = job.getInputFormat().getSplits(job, conf.getInt("mapred.tasktracker.tasks.maximum", 2));
String jobId = profile.getJobId();
int numReduceTasks = job.getNumReduceTasks();
@@ -110,7 +148,11 @@
numReduceTasks = 1;
job.setNumReduceTasks(1);
}
+
DataOutputBuffer buffer = new DataOutputBuffer();
+ ExecutorService pool = Executors.newFixedThreadPool(conf.getInt("mapred.tasktracker.tasks.maximum", 2));
+ Future<?>[] futures = new Future<?>[splits.length];
+ LOG.info("map tasks: " + splits.length);
for (int i = 0; i < splits.length; i++) {
String mapId = "map_" + idFormat.format(i);
mapIds.add(mapId);
@@ -125,14 +167,29 @@
JobConf localConf = new JobConf(job);
map.localizeConfiguration(localConf);
map.setConf(localConf);
- map_tasks += 1;
- myMetrics.launchMap();
- map.run(localConf, this);
- map.saveTaskOutput();
- myMetrics.completeMap();
- map_tasks -= 1;
- updateCounters(map);
+ //map_tasks += 1;
+ //myMetrics.launchMap();
+ progressReports.put(mapId, 0.0f);
+ futures[i] = pool.submit(new MapTaskRunner(map, localConf, this));
+ //map.run(localConf, this);
+ //map.saveTaskOutput();
+ //myMetrics.completeMap();
+ //map_tasks -= 1;
+ //updateCounters(map);
}
+
+ boolean finished = false;
+ /* wait for execution finished or mapFailed */
+ while(!finished){
+ Thread.sleep(5000);
+ if(mapFailed.get() != null)
+ throw (Throwable)mapFailed.get();
+ finished = true;
+ for(int i = 0; i < futures.length; i++)
+ finished = finished && futures[i].isDone();
+ }
+ pool.shutdown();
+
String reduceId = "reduce_" + newId();
try {
if (numReduceTasks > 0) {
@@ -205,11 +262,14 @@
public boolean progress(String taskId, float progress, String state,
TaskStatus.Phase phase, Counters taskCounters) {
- LOG.info(state);
+ //LOG.info(state);
float taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
float numTasks = mapIds.size();
- status.setMapProgress(taskIndex/numTasks + progress/numTasks);
+ totalProgress = totalProgress - progressReports.get(taskId) + progress / mapIds.size();
+ progressReports.put(taskId, progress / mapIds.size());
+ //status.setMapProgress(taskIndex/numTasks + progress/numTasks);
+ status.setMapProgress(totalProgress);
} else {
status.setReduceProgress(progress);
}
@@ -240,7 +300,10 @@
public void done(String taskId) throws IOException {
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
- status.setMapProgress(1.0f);
+ totalProgress = totalProgress - progressReports.get(taskId) + 1.0f / mapIds.size();
+ progressReports.put(taskId, 1.0f / mapIds.size());
+ status.setMapProgress(totalProgress);
+ //status.setMapProgress(status.mapProgress() + 1.0f / mapIds.size());
} else {
status.setReduceProgress(1.0f);
}
@@ -306,7 +369,7 @@
}
public ClusterStatus getClusterStatus() {
- return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
+ return new ClusterStatus(1, map_tasks.get(), reduce_tasks, 1);
}
public JobStatus[] jobsToComplete() {return null;}