Author: cutting Date: Thu Feb 22 12:22:15 2007 New Revision: 510644 URL: http://svn.apache.org/viewvc?view=rev&rev=510644 Log: HADOOP-248. Optimize location of map outputs to not use random probes. Contributed by Devaraj.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Feb 22 12:22:15 2007 @@ -116,6 +116,9 @@ 34. HADOOP-985. Change HDFS to identify nodes by IP address rather than by DNS hostname. (Raghu Angadi via cutting) +35. HADOOP-248. Optimize location of map outputs to not use random + probes. (Devaraj Das via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Thu Feb 22 12:22:15 2007 @@ -32,8 +32,10 @@ * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with * [EMAIL PROTECTED] #heartbeat(TaskTrackerStatus, boolean, boolean, short)} * version 4 changed TaskReport for HADOOP-549. + * version 5 introduced that removes locateMapOutputs and instead uses + * getTaskCompletionEvents to figure finished maps and fetch the outputs */ - public static final long versionID = 4L; + public static final long versionID = 5L; public final static int TRACKERS_OK = 0; public final static int UNKNOWN_TASKTRACKER = 1; @@ -63,18 +65,6 @@ boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException; - /** Called by a reduce task to find which map tasks are completed. - * - * @param jobId the job id - * @param mapTasksNeeded an array of the mapIds that we need - * @param partition the reduce's id - * @return an array of MapOutputLocation - */ - MapOutputLocation[] locateMapOutputs(String jobId, - int[] mapTasksNeeded, - int partition - ) throws IOException; - /** * The task tracker calls this once, to discern where it can find * files referred to by the JobTracker @@ -97,11 +87,12 @@ * Returns empty aray if no events are available. * @param jobid job id * @param fromEventId event id to start from. + * @param maxEvents the max number of events we want to look at * @return array of task completion events. * @throws IOException */ TaskCompletionEvent[] getTaskCompletionEvents( - String jobid, int fromEventId) throws IOException; + String jobid, int fromEventId, int maxEvents) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Thu Feb 22 12:22:15 2007 @@ -158,7 +158,7 @@ public synchronized TaskCompletionEvent[] getTaskCompletionEvents( int startFrom) throws IOException{ return jobSubmitClient.getTaskCompletionEvents( - getJobID(), startFrom); + getJobID(), startFrom, 10); } /** @@ -726,7 +726,9 @@ killJob = true; i++; } else if ("-events".equals(argv[i])) { - listEvents(argv[++i]); + listEvents(argv[i+1], Integer.parseInt(argv[i+2]), + Integer.parseInt(argv[i+3])); + i += 3; } } @@ -766,10 +768,13 @@ * @param jobId the job id for the job's events to list * @throws IOException */ - private void listEvents(String jobId) throws IOException { + private void listEvents(String jobId, int fromEventId, int numEvents) + throws IOException { TaskCompletionEvent[] events = - jobSubmitClient.getTaskCompletionEvents(jobId, 0); + jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents); System.out.println("Task completion events for " + jobId); + System.out.println("Number of events (from " + fromEventId + + ") are: " + events.length); for(TaskCompletionEvent event: events) { System.out.println(event.getTaskStatus() + " " + event.getTaskId() + " " + event.getTaskTrackerHttp()); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Feb 22 12:22:15 2007 @@ -282,24 +282,40 @@ if (state == TaskStatus.State.SUCCEEDED) { this.taskCompletionEvents.add( new TaskCompletionEvent( - taskCompletionEventTracker++, - status.getTaskId(), + taskCompletionEventTracker, + status.getTaskId(), + tip.idWithinJob(), + status.getIsMap(), TaskCompletionEvent.Status.SUCCEEDED, httpTaskLogLocation )); + tip.setSuccessEventNumber(taskCompletionEventTracker); completedTask(tip, status, metrics); } else if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) { this.taskCompletionEvents.add( new TaskCompletionEvent( - taskCompletionEventTracker++, - status.getTaskId(), + taskCompletionEventTracker, + status.getTaskId(), + tip.idWithinJob(), + status.getIsMap(), TaskCompletionEvent.Status.FAILED, httpTaskLogLocation )); + // Get the event number for the (possibly) previously successful + // task. If there exists one, then set that status to OBSOLETE + int eventNumber; + if ((eventNumber = tip.getSuccessEventNumber()) != -1) { + TaskCompletionEvent t = + this.taskCompletionEvents.get(eventNumber); + if (t.getTaskId().equals(status.getTaskId())) + t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE); + } // Tell the job to fail the relevant task failedTask(tip, status.getTaskId(), status, status.getTaskTracker(), wasRunning, wasComplete); } } + taskCompletionEventTracker++; + // // Update JobInProgress status // @@ -849,12 +865,14 @@ return null; } - public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) { + public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId, + int maxEvents) { TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; if( taskCompletionEvents.size() > fromEventId) { + int actualMax = Math.min(maxEvents, + (taskCompletionEvents.size() - fromEventId)); events = (TaskCompletionEvent[])taskCompletionEvents.subList( - fromEventId, taskCompletionEvents.size()). - toArray(events); + fromEventId, actualMax + fromEventId).toArray(events); } return events; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Thu Feb 22 12:22:15 2007 @@ -28,7 +28,11 @@ * the current system status. */ public interface JobSubmissionProtocol extends VersionedProtocol { - public static final long versionID = 1L; + /* + *Changing the versionID to 2L since the getTaskCompletionEvents method has + *changed + */ + public static final long versionID = 2L; /** * Submit a Job for execution. Returns the latest profile for * that job. @@ -85,11 +89,12 @@ * Get task completion events for the jobid, starting from fromEventId. * Returns empty aray if no events are available. * @param jobid job id - * @param fromEventId event id to start from. + * @param fromEventId event id to start from. + * @param maxEvents the max number of events we want to look at * @return array of task completion events. * @throws IOException */ public TaskCompletionEvent[] getTaskCompletionEvents( - String jobid, int fromEventId) throws IOException; + String jobid, int fromEventId, int maxEvents) throws IOException; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Feb 22 12:22:15 2007 @@ -1321,48 +1321,6 @@ } /** - * A TaskTracker wants to know the physical locations of completed, but not - * yet closed, tasks. This exists so the reduce task thread can locate - * map task outputs. - */ - public synchronized MapOutputLocation[] - locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) - throws IOException { - // Check to make sure that the job hasn't 'completed'. - JobInProgress job = getJob(jobId); - if (job.status.getRunState() != JobStatus.RUNNING) { - return new MapOutputLocation[0]; - } - - ArrayList result = new ArrayList(mapTasksNeeded.length); - for (int i = 0; i < mapTasksNeeded.length; i++) { - TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]); - if (status != null) { - String trackerId = - (String) taskidToTrackerMap.get(status.getTaskId()); - // Safety check, if we can't find the taskid in - // taskidToTrackerMap and job isn't 'running', then just - // return an empty array - if (trackerId == null && - job.status.getRunState() != JobStatus.RUNNING) { - return new MapOutputLocation[0]; - } - - TaskTrackerStatus tracker; - synchronized (taskTrackers) { - tracker = (TaskTrackerStatus) taskTrackers.get(trackerId); - } - result.add(new MapOutputLocation(status.getTaskId(), - mapTasksNeeded[i], - tracker.getHost(), - tracker.getHttpPort())); - } - } - return (MapOutputLocation[]) - result.toArray(new MapOutputLocation[result.size()]); - } - - /** * Grab the local fs name */ public synchronized String getFilesystemName() throws IOException { @@ -1493,14 +1451,14 @@ /* * Returns a list of TaskCompletionEvent for the given job, * starting from fromEventId. - * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int) + * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int) */ public synchronized TaskCompletionEvent[] getTaskCompletionEvents( - String jobid, int fromEventId) throws IOException{ + String jobid, int fromEventId, int maxEvents) throws IOException{ TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; JobInProgress job = (JobInProgress)this.jobs.get(jobid); if (null != job) { - events = job.getTaskCompletionEvents(fromEventId); + events = job.getTaskCompletionEvents(fromEventId, maxEvents); } return events; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Feb 22 12:22:15 2007 @@ -251,7 +251,7 @@ public JobStatus[] jobsToComplete() {return null;} public TaskCompletionEvent[] getTaskCompletionEvents( - String jobid, int fromEventId) throws IOException{ + String jobid, int fromEventId, int maxEvents) throws IOException{ return TaskCompletionEvent.EMPTY_ARRAY; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Thu Feb 22 12:22:15 2007 @@ -28,6 +28,7 @@ import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.util.*; +import org.apache.hadoop.io.IntWritable; import java.io.*; import java.util.*; @@ -130,9 +131,19 @@ /** * the number of map output locations to poll for at one time + */ + private int probe_sample_size = 50; + + /** + * a Random used during the map output fetching */ - private static final int PROBE_SAMPLE_SIZE = 50; - + private Random randForProbing; + + /** + * a hashmap from mapId to MapOutputLocation for retrials + */ + private Map<Integer, MapOutputLocation> retryFetches = new HashMap(); + /** Represents the result of an attempt to copy a map output */ private class CopyResult { @@ -417,6 +428,10 @@ Random backoff = new Random(); final Progress copyPhase = getTask().getProgress().phase(); + //tweak the probe sample size (make it a function of numCopiers) + probe_sample_size = Math.max(numCopiers*5, 50); + randForProbing = new Random(reduceTask.getPartition() * 100); + for (int i = 0; i < numOutputs; i++) { neededOutputs.add(new Integer(i)); copyPhase.addPhase(); // add sub-phase per file @@ -434,6 +449,8 @@ // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); long currentTime = startTime; + IntWritable fromEventId = new IntWritable(0); + PingTimer pingTimer = new PingTimer(); pingTimer.setName("Map output copy reporter for task " + reduceTask.getTaskId()); @@ -450,17 +467,36 @@ LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() + " map output location(s)"); try { - MapOutputLocation[] locs = queryJobTracker(neededOutputs, jobClient); + // the call to queryJobTracker will modify fromEventId to a value + // that it should be for the next call to queryJobTracker + MapOutputLocation[] locs = queryJobTracker(fromEventId, jobClient); // remove discovered outputs from needed list // and put them on the known list + int gotLocs = (locs == null ? 0 : locs.length); for (int i=0; i < locs.length; i++) { - neededOutputs.remove(new Integer(locs[i].getMapId())); - knownOutputs.add(locs[i]); + // check whether we actually need an output. It could happen + // that a map task that successfully ran earlier got lost, but + // if we already have copied the output of that unfortunate task + // we need not copy it again from the new TT (we will ignore + // the event for the new rescheduled execution) + if(neededOutputs.remove(new Integer(locs[i].getMapId()))) { + // remove the mapId from the retryFetches hashmap since we now + // prefer the new location instead of what we saved earlier + retryFetches.remove(new Integer(locs[i].getMapId())); + knownOutputs.add(locs[i]); + } + else gotLocs--; //we don't need this output + } + // now put the remaining hash entries for the failed fetches + // and clear the hashmap + knownOutputs.addAll(retryFetches.values()); LOG.info(reduceTask.getTaskId() + - " Got " + (locs == null ? 0 : locs.length) + - " map outputs from jobtracker"); + " Got " + gotLocs + + " new map outputs from jobtracker and " + retryFetches.size() + + " map outputs from previous failures"); + retryFetches.clear(); } catch (IOException ie) { LOG.warn(reduceTask.getTaskId() + @@ -534,6 +570,7 @@ } else { // this copy failed, put it back onto neededOutputs neededOutputs.add(new Integer(cr.getMapId())); + retryFetches.put(new Integer(cr.getMapId()), cr.getLocation()); // wait a random amount of time for next contact currentTime = System.currentTimeMillis(); @@ -553,6 +590,7 @@ while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); if (cr.getHost().equals(loc.getHost())) { + retryFetches.put(new Integer(loc.getMapId()), loc); locIt.remove(); neededOutputs.add(new Integer(loc.getMapId())); } @@ -563,7 +601,7 @@ } // ensure we have enough to keep us busy - if (numInFlight < lowThreshold && (numOutputs-numCopied) > PROBE_SAMPLE_SIZE) { + if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) { break; } } @@ -658,28 +696,16 @@ } /** Queries the job tracker for a set of outputs ready to be copied - * @param neededOutputs the list of currently unknown outputs + * @param fromEventId the first event ID we want to start from, this will be + * modified by the call to this method * @param jobClient the job tracker * @return a set of locations to copy outputs from * @throws IOException */ - private MapOutputLocation[] queryJobTracker(List neededOutputs, + private MapOutputLocation[] queryJobTracker(IntWritable fromEventId, InterTrackerProtocol jobClient) throws IOException { - // query for a just a random subset of needed segments so that we don't - // overwhelm jobtracker. ideally perhaps we could send a more compact - // representation of all needed, i.e., a bit-vector - int checkSize = Math.min(PROBE_SAMPLE_SIZE, neededOutputs.size()); - int neededIds[] = new int[checkSize]; - - Collections.shuffle(neededOutputs); - - ListIterator itr = neededOutputs.listIterator(); - for (int i=0; i < checkSize; i++) { - neededIds[i] = ((Integer)itr.next()).intValue(); - } - long currentTime = System.currentTimeMillis(); long pollTime = lastPollTime + MIN_POLL_INTERVAL; while (currentTime < pollTime) { @@ -690,9 +716,28 @@ } lastPollTime = currentTime; - return jobClient.locateMapOutputs(reduceTask.getJobId().toString(), - neededIds, - reduceTask.getPartition()); + TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents( + reduceTask.getJobId().toString(), + fromEventId.get(), + probe_sample_size); + + List <MapOutputLocation> mapOutputsList = new ArrayList(); + for (int i = 0; i < t.length; i++) { + if (t[i].isMap && + t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) { + URI u = URI.create(t[i].getTaskTrackerHttp()); + String host = u.getHost(); + int port = u.getPort(); + String taskId = t[i].getTaskId(); + int mId = t[i].idWithinJob(); + mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port)); + } + } + Collections.shuffle(mapOutputsList, randForProbing); + MapOutputLocation[] locations = + new MapOutputLocation[mapOutputsList.size()]; + fromEventId.set(fromEventId.get() + t.length); + return mapOutputsList.toArray(locations); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Thu Feb 22 12:22:15 2007 @@ -12,12 +12,14 @@ * */ public class TaskCompletionEvent implements Writable{ - static public enum Status {FAILED, SUCCEEDED}; + static public enum Status {FAILED, SUCCEEDED, OBSOLETE}; private int eventId ; private String taskTrackerHttp ; private String taskId ; Status status ; + boolean isMap = false ; + private int idWithinJob; public static final TaskCompletionEvent[] EMPTY_ARRAY = new TaskCompletionEvent[0]; /** @@ -35,11 +37,15 @@ * @param taskTrackerHttp task tracker's host:port for http. */ public TaskCompletionEvent(int eventId, - String taskId, + String taskId, + int idWithinJob, + boolean isMap, Status status, String taskTrackerHttp){ - this.taskId = taskId ; + this.taskId = taskId ; + this.idWithinJob = idWithinJob ; + this.isMap = isMap ; this.eventId = eventId ; this.status =status ; this.taskTrackerHttp = taskTrackerHttp ; @@ -114,17 +120,28 @@ return buf.toString(); } + public boolean isMapTask() { + return isMap; + } + + public int idWithinJob() { + return idWithinJob; + } ////////////////////////////////////////////// // Writable ////////////////////////////////////////////// public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, taskId); + WritableUtils.writeVInt(out, idWithinJob); + out.writeBoolean(isMap); WritableUtils.writeEnum(out, status); WritableUtils.writeString(out, taskTrackerHttp); } public void readFields(DataInput in) throws IOException { this.taskId = WritableUtils.readString(in) ; + this.idWithinJob = WritableUtils.readVInt(in); + this.isMap = in.readBoolean(); this.status = WritableUtils.readEnum(in, Status.class); this.taskTrackerHttp = WritableUtils.readString(in); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=510644&r1=510643&r2=510644 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Feb 22 12:22:15 2007 @@ -63,6 +63,7 @@ private JobInProgress job; // Status of the TIP + private int successEventNumber = -1; private int numTaskFailures = 0; private double progress = 0; private String state = ""; @@ -144,6 +145,15 @@ } /** + * Return the index of the tip within the job, so "tip_0002_m_012345" + * would return 12345; + * @return int the tip index + */ + public int idWithinJob() { + return partition; + } + + /** * Initialization common to Map and Reduce */ void init(String jobUniqueString) { @@ -573,5 +583,19 @@ */ public int getIdWithinJob() { return partition; + } + + /** + * Set the event number that was raised for this tip + */ + public void setSuccessEventNumber(int eventNumber) { + successEventNumber = eventNumber; + } + + /** + * Get the event number that was raised for this tip + */ + public int getSuccessEventNumber() { + return successEventNumber; } }