Author: cutting Date: Wed Feb 21 12:45:18 2007 New Revision: 510190 URL: http://svn.apache.org/viewvc?view=rev&rev=510190 Log: HADOOP-867. Move split creation out of JobTracker. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 21 12:45:18 2007 @@ -81,6 +81,11 @@ 24. HADOOP-1017. Cache constructors, for improved performance. (Ron Bodkin via cutting) +25. HADOOP-867. Move split creation out of JobTracker to client. + Splits are now saved in a separate file, read by task processes + directly, so that user code is no longer required in the + JobTracker. (omalley via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java Wed Feb 21 12:45:18 2007 @@ -28,6 +28,13 @@ public interface InputSplit extends Writable { /** + * Get the number of input bytes in the split. + * @return the number of bytes in the input split + * @throws IOException + */ + long getLength() throws IOException; + + /** * Get the list of hostnames where the input split is located. * @return A list of prefered hostnames * @throws IOException Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Feb 21 12:45:18 2007 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.net.URL; @@ -26,7 +27,9 @@ import org.apache.commons.logging.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; public class IsolationRunner { private static final Log LOG = @@ -152,12 +155,15 @@ Task task; if (isMap) { - FileSplit split = new FileSplit(new Path(conf.get("map.input.file")), - conf.getLong("map.input.start", 0), - conf.getLong("map.input.length", 0), - conf); + Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), + "split.dta"); + DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit); + String splitClass = Text.readString(splitFile); + BytesWritable split = new BytesWritable(); + split.readFields(splitFile); + splitFile.close(); task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), - taskId, partition, split); + taskId, partition, splitClass, split); } else { int numMaps = conf.getNumMapTasks(); fillInMissingMapOutputs(local, taskId, numMaps, conf); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Feb 21 12:45:18 2007 @@ -20,6 +20,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.*; @@ -38,7 +39,7 @@ *******************************************************/ public class JobClient extends ToolBase implements MRConstants { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient"); - public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL }; + public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; static long MAX_JOBPROFILE_AGE = 1000 * 2; @@ -259,7 +260,10 @@ Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); Path submitJobFile = new Path(submitJobDir, "job.xml"); Path submitJarFile = new Path(submitJobDir, "job.jar"); + Path submitSplitFile = new Path(submitJobDir, "job.split"); + FileSystem fs = getFs(); + LOG.debug("default FileSystem: " + fs.getUri()); // try getting the md5 of the archives URI[] tarchives = DistributedCache.getCacheArchives(job); URI[] tfiles = DistributedCache.getCacheFiles(job); @@ -317,8 +321,42 @@ // Check the output specification job.getOutputFormat().checkOutputSpecs(fs, job); + // Create the splits for the job + LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); + InputSplit[] splits = + job.getInputFormat().getSplits(job, job.getNumMapTasks()); + // sort the splits into order based on size, so that the biggest + // go first + Arrays.sort(splits, new Comparator() { + public int compare(Object a, Object b) { + try { + long left = ((InputSplit) a).getLength(); + long right = ((InputSplit) b).getLength(); + if (left == right) { + return 0; + } else if (left < right) { + return 1; + } else { + return -1; + } + } catch (IOException ie) { + throw new RuntimeException("Problem getting input split size", + ie); + } + } + }); + // write the splits to a file for the job tracker + FSDataOutputStream out = fs.create(submitSplitFile); + try { + writeSplitsFile(splits, out); + } finally { + out.close(); + } + job.set("mapred.job.split.file", submitSplitFile.toString()); + job.setNumMapTasks(splits.length); + // Write job file to JobTracker's fs - FSDataOutputStream out = fs.create(submitJobFile, replication); + out = fs.create(submitJobFile, replication); try { job.write(out); } finally { @@ -336,6 +374,108 @@ } } + static class RawSplit implements Writable { + private String splitClass; + private BytesWritable bytes = new BytesWritable(); + private String[] locations; + + public void setBytes(byte[] data, int offset, int length) { + bytes.set(data, offset, length); + } + + public void setClassName(String className) { + splitClass = className; + } + + public String getClassName() { + return splitClass; + } + + public BytesWritable getBytes() { + return bytes; + } + + public void setLocations(String[] locations) { + this.locations = locations; + } + + public String[] getLocations() { + return locations; + } + + public void readFields(DataInput in) throws IOException { + splitClass = Text.readString(in); + bytes.readFields(in); + int len = WritableUtils.readVInt(in); + locations = new String[len]; + for(int i=0; i < len; ++i) { + locations[i] = Text.readString(in); + } + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, splitClass); + bytes.write(out); + WritableUtils.writeVInt(out, locations.length); + for(int i = 0; i < locations.length; i++) { + Text.writeString(out, locations[i]); + } + } + } + + private static final int CURRENT_SPLIT_FILE_VERSION = 0; + private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes(); + + /** Create the list of input splits and write them out in a file for + *the JobTracker. The format is: + * <format version> + * <numSplits> + * for each split: + * <RawSplit> + * @param splits the input splits to write out + * @param out the stream to write to + */ + private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException { + out.write(SPLIT_FILE_HEADER); + WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION); + WritableUtils.writeVInt(out, splits.length); + DataOutputBuffer buffer = new DataOutputBuffer(); + RawSplit rawSplit = new RawSplit(); + for(InputSplit split: splits) { + rawSplit.setClassName(split.getClass().getName()); + buffer.reset(); + split.write(buffer); + rawSplit.setBytes(buffer.getData(), 0, buffer.getLength()); + rawSplit.setLocations(split.getLocations()); + rawSplit.write(out); + } + } + + /** + * Read a splits file into a list of raw splits + * @param in the stream to read from + * @return the complete list of splits + * @throws IOException + */ + static RawSplit[] readSplitFile(DataInput in) throws IOException { + byte[] header = new byte[SPLIT_FILE_HEADER.length]; + in.readFully(header); + if (!Arrays.equals(SPLIT_FILE_HEADER, header)) { + throw new IOException("Invalid header on split file"); + } + int vers = WritableUtils.readVInt(in); + if (vers != CURRENT_SPLIT_FILE_VERSION) { + throw new IOException("Unsupported split version " + vers); + } + int len = WritableUtils.readVInt(in); + RawSplit[] result = new RawSplit[len]; + for(int i=0; i < len; ++i) { + result[i] = new RawSplit(); + result[i].readFields(in); + } + return result; + } + /** * Get an RunningJob object to track an ongoing job. Returns * null if the id does not correspond to any known job. @@ -384,15 +524,13 @@ String lastReport = null; final int MAX_RETRIES = 5; int retries = MAX_RETRIES; - String outputFilterName = job.get("jobclient.output.filter", "FAILED"); - - if (null != outputFilterName) { - try { - jc.setTaskOutputFilter(TaskStatusFilter.valueOf(outputFilterName)); - } catch(IllegalArgumentException e) { - LOG.warn("Invalid Output filter : " + outputFilterName + - " Valid values are : NONE, FAILED, SUCCEEDED, ALL"); - } + TaskStatusFilter filter; + try { + filter = getTaskOutputFilter(job); + } catch(IllegalArgumentException e) { + LOG.warn("Invalid Output filter : " + e.getMessage() + + " Valid values are : NONE, FAILED, SUCCEEDED, ALL"); + throw e; } try { running = jc.submitJob(job); @@ -418,12 +556,12 @@ lastReport = report; } - if( jc.getTaskOutputFilter() != TaskStatusFilter.NONE){ + if( filter != TaskStatusFilter.NONE){ TaskCompletionEvent[] events = running.getTaskCompletionEvents(eventCounter); eventCounter += events.length ; for(TaskCompletionEvent event : events ){ - switch( jc.getTaskOutputFilter() ){ + switch( filter ){ case SUCCEEDED: if( event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED){ @@ -524,13 +662,36 @@ * output matches the filter. * @param newValue task filter. */ + @Deprecated public void setTaskOutputFilter(TaskStatusFilter newValue){ this.taskOutputFilter = newValue ; } + + /** + * Get the task output filter out of the JobConf + * @param job the JobConf to examine + * @return the filter level + */ + public static TaskStatusFilter getTaskOutputFilter(JobConf job) { + return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", + "FAILED")); + } + + /** + * Modify the JobConf to set the task output filter + * @param job the JobConf to modify + * @param newValue the value to set + */ + public static void setTaskOutputFilter(JobConf job, + TaskStatusFilter newValue) { + job.set("jobclient.output.filter", newValue.toString()); + } + /** * Returns task output filter. * @return task filter. */ + @Deprecated public TaskStatusFilter getTaskOutputFilter(){ return this.taskOutputFilter; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Feb 21 12:45:18 2007 @@ -21,6 +21,9 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics; import org.apache.hadoop.mapred.JobHistory.Values ; import java.io.*; @@ -52,7 +55,7 @@ int failedMapTasks = 0 ; int failedReduceTasks = 0 ; JobTracker jobtracker = null; - HashMap hostToMaps = new HashMap(); + Map<String,List<TaskInProgress>> hostToMaps = new HashMap(); private int taskCompletionEventTracker = 0 ; List<TaskCompletionEvent> taskCompletionEvents ; @@ -114,35 +117,35 @@ } // - // construct input splits + // read input splits and create a map per a split // String jobFile = profile.getJobFile(); FileSystem fs = FileSystem.get(conf); - if (localJarFile != null) { - ClassLoader loader = - new URLClassLoader(new URL[]{ localFs.pathToFile(localJarFile).toURL() }); - conf.setClassLoader(loader); - } - InputFormat inputFormat = conf.getInputFormat(); - - InputSplit[] splits = inputFormat.getSplits(conf, numMapTasks); - - // - // sort splits by decreasing length, to reduce job's tail - // - Arrays.sort(splits, new Comparator() { - public int compare(Object a, Object b) { - long diff = - ((FileSplit)b).getLength() - ((FileSplit)a).getLength(); - return diff==0 ? 0 : (diff > 0 ? 1 : -1); + DataInputStream splitFile = + fs.open(new Path(conf.get("mapred.job.split.file"))); + JobClient.RawSplit[] splits; + try { + splits = JobClient.readSplitFile(splitFile); + } finally { + splitFile.close(); + } + numMapTasks = splits.length; + maps = new TaskInProgress[numMapTasks]; + for(int i=0; i < numMapTasks; ++i) { + maps[i] = new TaskInProgress(uniqueString, jobFile, + splits[i].getClassName(), + splits[i].getBytes(), + jobtracker, conf, this, i); + for(String host: splits[i].getLocations()) { + List<TaskInProgress> hostMaps = hostToMaps.get(host); + if (hostMaps == null) { + hostMaps = new ArrayList(); + hostToMaps.put(host, hostMaps); } - }); - - // - // adjust number of map tasks to actual number of splits - // - this.numMapTasks = splits.length; + hostMaps.add(maps[i]); + } + } // if no split is returned, job is considered completed and successful if (numMapTasks == 0) { @@ -154,13 +157,6 @@ return; } - // create a map task for each split - this.maps = new TaskInProgress[numMapTasks]; - for (int i = 0; i < numMapTasks; i++) { - maps[i] = new TaskInProgress(uniqueString, jobFile, splits[i], - jobtracker, conf, this, i); - } - // // Create reduce tasks // @@ -171,22 +167,6 @@ jobtracker, conf, this); } - // - // Obtain some tasktracker-cache information for the map task splits. - // - for (int i = 0; i < maps.length; i++) { - String hints[] = splits[i].getLocations(); - for (int k = 0; k < hints.length; k++) { - ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k]); - if (hostMaps == null) { - hostMaps = new ArrayList(); - hostToMaps.put(hints[k], hostMaps); - } - hostMaps.add(maps[i]); - - } - } - this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING); tasksInited = true; @@ -342,7 +322,8 @@ /** * Return a MapTask, if appropriate, to run on the given tasktracker */ - public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) { + public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize + ) throws IOException { if (! tasksInited) { LOG.info("Cannot create task split for " + profile.getJobId()); return null; @@ -370,7 +351,7 @@ * work on temporary MapRed files. */ public Task obtainNewReduceTask(TaskTrackerStatus tts, - int clusterSize) { + int clusterSize) throws IOException { if (! tasksInited) { LOG.info("Cannot create task split for " + profile.getJobId()); return null; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Feb 21 12:45:18 2007 @@ -1143,7 +1143,8 @@ * and incorporate knowledge of DFS file placement. But for right now, it * just grabs a single item out of the pending task list and hands it back. */ - private synchronized Task getNewTaskForTaskTracker(String taskTracker) { + private synchronized Task getNewTaskForTaskTracker(String taskTracker + ) throws IOException { // // Compute average map and reduce task numbers across pool // Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Feb 21 12:45:18 2007 @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics; /** Implements MapReduce locally, in-process, for debugging. */ @@ -95,12 +96,18 @@ // run a map task for each split job.setNumReduceTasks(1); // force a single reduce task + DataOutputBuffer buffer = new DataOutputBuffer(); for (int i = 0; i < splits.length; i++) { String mapId = "map_" + newId() ; mapIds.add(mapId); + buffer.reset(); + splits[i].write(buffer); + BytesWritable split = new BytesWritable(); + split.set(buffer.getData(), 0, buffer.getLength()); MapTask map = new MapTask(jobId, file, "tip_m_" + mapId, mapId, i, - splits[i]); + splits[i].getClass().getName(), + split); JobConf localConf = new JobConf(job); map.localizeConfiguration(localConf); map.setConf(localConf); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Feb 21 12:45:18 2007 @@ -44,17 +44,15 @@ /** A Map task. */ class MapTask extends Task { + + private MapTaskMetrics myMetrics = null; - public static final Log LOG = - LogFactory.getLog("org.apache.hadoop.mapred.MapTask"); + private BytesWritable split = new BytesWritable(); + private String splitClass; + private MapOutputFile mapOutputFile = new MapOutputFile(); + private JobConf conf; - static { // register a ctor - WritableFactories.setFactory - (MapTask.class, - new WritableFactory() { - public Writable newInstance() { return new MapTask(); } - }); - } + private static final Log LOG = LogFactory.getLog(MapTask.class.getName()); { // set phase for this task setPhase(TaskStatus.Phase.MAP); @@ -83,19 +81,15 @@ } } - - private MapTaskMetrics myMetrics = null; - - private InputSplit split; - private MapOutputFile mapOutputFile = new MapOutputFile(); - private JobConf conf; public MapTask() {} public MapTask(String jobId, String jobFile, String tipId, String taskId, - int partition, InputSplit split) { + int partition, String splitClass, BytesWritable split + ) throws IOException { super(jobId, jobFile, tipId, taskId, partition); - this.split = split; + this.splitClass = splitClass; + this.split.set(split); } public boolean isMapTask() { @@ -107,30 +101,25 @@ Path localSplit = new Path(new Path(getJobFile()).getParent(), "split.dta"); DataOutputStream out = FileSystem.getLocal(conf).create(localSplit); + Text.writeString(out, splitClass); split.write(out); out.close(); - if (split instanceof FileSplit) { - conf.set("map.input.file", ((FileSplit) split).getPath().toString()); - conf.setLong("map.input.start", ((FileSplit) split).getStart()); - conf.setLong("map.input.length", ((FileSplit) split).getLength()); - } } public TaskRunner createRunner(TaskTracker tracker) { return new MapTaskRunner(this, tracker, this.conf); } - public InputSplit getSplit() { return split; } - public void write(DataOutput out) throws IOException { super.write(out); + Text.writeString(out, splitClass); split.write(out); } + public void readFields(DataInput in) throws IOException { super.readFields(in); - - split = new FileSplit(); + splitClass = Text.readString(in); split.readFields(in); if (myMetrics == null) { myMetrics = new MapTaskMetrics("unknown"); @@ -144,6 +133,28 @@ Reporter reporter = getReporter(umbilical, getProgress()); MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter); + + // reinstantiate the split + InputSplit split; + try { + split = (InputSplit) + ReflectionUtils.newInstance(job.getClassByName(splitClass), job); + } catch (ClassNotFoundException exp) { + IOException wrap = new IOException("Split class " + splitClass + + " not found"); + wrap.initCause(exp); + throw wrap; + } + DataInputBuffer splitBuffer = new DataInputBuffer(); + splitBuffer.reset(this.split.get(), 0, this.split.getSize()); + split.readFields(splitBuffer); + + // if it is a file split, we can give more details + if (split instanceof FileSplit) { + job.set("map.input.file", ((FileSplit) split).getPath().toString()); + job.setLong("map.input.start", ((FileSplit) split).getStart()); + job.setLong("map.input.length", ((FileSplit) split).getLength()); + } final RecordReader rawIn = // open input job.getInputFormat().getRecordReader(split, job, reporter); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Feb 21 12:45:18 2007 @@ -150,7 +150,7 @@ public Progress getProgress() { return taskProgress; } - public Reporter getReporter(final TaskUmbilicalProtocol umbilical, + protected Reporter getReporter(final TaskUmbilicalProtocol umbilical, final Progress progress) throws IOException { return new Reporter() { public void setStatus(String status) throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Feb 21 12:45:18 2007 @@ -18,8 +18,10 @@ package org.apache.hadoop.mapred; import org.apache.commons.logging.*; +import org.apache.hadoop.io.BytesWritable; import java.text.NumberFormat; +import java.io.IOException; import java.util.*; @@ -52,7 +54,8 @@ // Defines the TIP private String jobFile = null; - private InputSplit split = null; + private String splitClass = null; + private BytesWritable split = null; private int numMaps; private int partition; private JobTracker jobtracker; @@ -93,10 +96,12 @@ /** * Constructor for MapTask */ - public TaskInProgress(String uniqueString, String jobFile, InputSplit split, + public TaskInProgress(String uniqueString, String jobFile, + String splitClass, BytesWritable split, JobTracker jobtracker, JobConf conf, JobInProgress job, int partition) { this.jobFile = jobFile; + this.splitClass = splitClass; this.split = split; this.jobtracker = jobtracker; this.job = job; @@ -501,7 +506,7 @@ /** * Return a Task that can be sent to a TaskTracker for execution. */ - public Task getTaskToRun(String taskTracker) { + public Task getTaskToRun(String taskTracker) throws IOException { Task t = null; if( 0 == execStartTime ){ // assume task starts running now @@ -522,7 +527,8 @@ String jobId = job.getProfile().getJobId(); if (isMapTask()) { - t = new MapTask(jobId, jobFile, this.id, taskid, partition, split); + t = new MapTask(jobId, jobFile, this.id, taskid, partition, + splitClass, split); } else { t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Feb 21 12:45:18 2007 @@ -913,8 +913,8 @@ volatile TaskStatus.State runstate; long lastProgressReport; StringBuffer diagnosticInfo = new StringBuffer(); - TaskRunner runner; - boolean done = false; + private TaskRunner runner; + volatile boolean done = false; boolean wasKilled = false; private JobConf defaultJobConf; private JobConf localJobConf; @@ -1226,7 +1226,9 @@ } synchronized (this) { try { - runner.close(); + if (runner != null) { + runner.close(); + } defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR + task.getJobId() + @@ -1398,6 +1400,7 @@ Task task = umbilical.getTask(taskid); JobConf job = new JobConf(task.getJobFile()); + task.setConf(job); defaultConf.addFinalResource(new Path(task.getJobFile())); @@ -1468,16 +1471,28 @@ * job tracker in the next heartbeat cycle. * @return a copy of the list of TaskStatus objects */ - synchronized List getRunningTaskStatuses() { - List result = new ArrayList(runningTasks.size()); - Iterator itr = runningTasks.values().iterator(); - while (itr.hasNext()) { - TaskInProgress tip = (TaskInProgress) itr.next(); + synchronized List<TaskStatus> getRunningTaskStatuses() { + List<TaskStatus> result = new ArrayList(runningTasks.size()); + for(TaskInProgress tip: runningTasks.values()) { result.add(tip.createStatus()); } return result; } - + + /** + * Get the list of stored tasks on this task tracker. + * @return + */ + synchronized List<TaskStatus> getNonRunningTasks() { + List<TaskStatus> result = new ArrayList(tasks.size()); + for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) { + if (!runningTasks.containsKey(task.getKey())) { + result.add(task.getValue().createStatus()); + } + } + return result; + } + /** * Get the default job conf for this tracker. */ Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Wed Feb 21 12:45:18 2007 @@ -216,6 +216,8 @@ // this timeout controls the minimum time for the test, so // set it down at 1 seconds. result.setInt("ipc.client.timeout", 1000); + // for debugging have all task output sent to the test output + JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL); return result; } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Wed Feb 21 12:45:18 2007 @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.Random; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -43,12 +42,6 @@ */ public static class PiMapper extends MapReduceBase implements Mapper { - - /** Mapper configuration. - * - */ - public void configure(JobConf job) { - } static Random r = new Random(); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Wed Feb 21 12:45:18 2007 @@ -18,8 +18,18 @@ package org.apache.hadoop.mapred; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.Progressable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.io.File; +import java.util.Iterator; import junit.framework.TestCase; /** @@ -49,8 +59,9 @@ TEST_ROOT_DIR + "/wc/output", TEST_ROOT_DIR + "/cachedir", job, - "The quick brown fox\nhas many silly\n" - + "red fox sox\n"); + "The quick brown fox\n" + + "has many silly\n" + + "red fox sox\n"); // assert the number of lines read during caching assertTrue("Failed test archives not matching", ret); // test the task report fetchers @@ -59,8 +70,193 @@ assertEquals("number of maps", 10, reports.length); reports = client.getReduceTaskReports("job_0001"); assertEquals("number of reduces", 1, reports.length); + runCustomFormats(mr); } finally { if (mr != null) { mr.shutdown(); } } + } + + private void runCustomFormats(MiniMRCluster mr) throws IOException { + JobConf job = mr.createJobConf(); + FileSystem fileSys = FileSystem.get(job); + Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local"); + Path outDir = new Path(testDir, "out"); + System.out.println("testDir= " + testDir); + fileSys.delete(testDir); + + job.setInputFormat(MyInputFormat.class); + job.setOutputFormat(MyOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + job.setNumMapTasks(100); + job.setNumReduceTasks(1); + // explicitly do not use "normal" job.setOutputPath to make sure + // that it is not hardcoded anywhere in the framework. + job.set("non.std.out", outDir.toString()); + try { + JobClient.runJob(job); + String result = + TestMiniMRWithDFS.readOutput(outDir, job); + assertEquals("output", ("aunt annie\t1\n" + + "bumble boat\t4\n" + + "crocodile pants\t0\n" + + "duck-dog\t5\n"+ + "eggs\t2\n" + + "finagle the agent\t3\n"), result); + } finally { + fileSys.delete(testDir); + } + + } + + private static class MyInputFormat implements InputFormat { + static final String[] data = new String[]{ + "crocodile pants", + "aunt annie", + "eggs", + "finagle the agent", + "bumble boat", + "duck-dog", + }; + + private static class MySplit implements InputSplit { + int first; + int length; + + public MySplit() { } + + public MySplit(int first, int length) { + this.first = first; + this.length = length; + } + + public String[] getLocations() { + return new String[0]; + } + + public long getLength() { + return length; + } + + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, first); + WritableUtils.writeVInt(out, length); + } + + public void readFields(DataInput in) throws IOException { + first = WritableUtils.readVInt(in); + length = WritableUtils.readVInt(in); + } + } + + static class MyRecordReader implements RecordReader { + int index; + int past; + int length; + + MyRecordReader(int index, int length) { + this.index = index; + this.past = index + length; + this.length = length; + } + + public boolean next(Writable key, Writable value) throws IOException { + if (index < past) { + ((IntWritable) key).set(index); + ((Text) value).set(data[index]); + index += 1; + return true; + } + return false; + } + + public WritableComparable createKey() { + return new IntWritable(); + } + + public Writable createValue() { + return new Text(); + } + + public long getPos() throws IOException { + return index; + } + + public void close() throws IOException {} + + public float getProgress() throws IOException { + return 1.0f - (past-index)/length; + } + } + + public void validateInput(JobConf job) throws IOException { + } + + public InputSplit[] getSplits(JobConf job, + int numSplits) throws IOException { + return new MySplit[]{new MySplit(0,1), new MySplit(1,3), + new MySplit(4,2)}; + } + + public RecordReader getRecordReader(InputSplit split, + JobConf job, + Reporter reporter) throws IOException { + MySplit sp = (MySplit) split; + return new MyRecordReader(sp.first, sp.length); + } + + } + + static class MyMapper extends MapReduceBase implements Mapper { + public void map(WritableComparable key, Writable value, + OutputCollector out, Reporter reporter) throws IOException { + System.out.println("map: " + key + ", " + value); + out.collect((WritableComparable) value, key); + } + } + + static class MyReducer extends MapReduceBase implements Reducer { + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter + ) throws IOException { + while (values.hasNext()) { + Writable value = (Writable) values.next(); + System.out.println("reduce: " + key + ", " + value); + output.collect(key, value); + } + } + } + + static class MyOutputFormat implements OutputFormat { + static class MyRecordWriter implements RecordWriter { + private DataOutputStream out; + + public MyRecordWriter(Path outputFile, JobConf job) throws IOException { + out = outputFile.getFileSystem(job).create(outputFile); + } + + public void write(WritableComparable key, + Writable value) throws IOException { + out.writeBytes(key.toString() + "\t" + value.toString() + "\n"); + } + + public void close(Reporter reporter) throws IOException { + out.close(); + } + } + + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, + Progressable progress + ) throws IOException { + return new MyRecordWriter(new Path(job.get("non.std.out")), job); + } + + public void checkOutputSpecs(FileSystem ignored, + JobConf job) throws IOException { + } } } Modified: lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp?view=diff&rev=510190&r1=510189&r2=510190 ============================================================================== --- lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp (original) +++ lucene/hadoop/trunk/src/webapps/task/tasktracker.jsp Wed Feb 21 12:45:18 2007 @@ -46,6 +46,17 @@ </table> </center> +<h2>Non-Running Tasks</h2> +<table border=2 cellpadding="5" cellspacing="2"> +<tr><td align="center">Task Attempts</td><td>Status</td> + <% + for(TaskStatus status: tracker.getNonRunningTasks()) { + out.print("<tr><td>" + status.getTaskId() + "</td>"); + out.print("<td>" + status.getRunState() + "</td></tr>\n"); + } + %> +</table> + <h2>Local Logs</h2> <a href="/logs/">Log</a> directory