Author: ddas Date: Tue Oct 2 23:02:35 2007 New Revision: 581502 URL: http://svn.apache.org/viewvc?rev=581502&view=rev Log: HADOOP-1862. reduces are getting stuck trying to find map outputs. (Arun C. Murthy via ddas)
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?rev=581502&r1=581501&r2=581502&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Tue Oct 2 23:02:35 2007 @@ -11,6 +11,9 @@ HADOOP-1948. Removed spurious error message during block crc upgrade. (Raghu Angadi via dhruba) + HADOOP-1862. reduces are getting stuck trying to find map outputs. + (Arun C. Murthy via ddas) + HADOOP-1977. Fixed handling of ToolBase cli options in JobClient. (enis via omalley) Modified: lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml?rev=581502&r1=581501&r2=581502&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml (original) +++ lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml Tue Oct 2 23:02:35 2007 @@ -820,7 +820,8 @@ <value>FAILED</value> <description>The filter for controlling the output of the task's userlogs sent to the console of the JobClient. - The permissible options are: NONE, FAILED, SUCCEEDED and ALL. + The permissible options are: NONE, KILLED, FAILED, SUCCEEDED and + ALL. </description> </property> Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobClient.java?rev=581502&r1=581501&r2=581502&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobClient.java Tue Oct 2 23:02:35 2007 @@ -39,7 +39,7 @@ *******************************************************/ public class JobClient extends ToolBase implements MRConstants { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient"); - public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL } + public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; static long MAX_JOBPROFILE_AGE = 1000 * 2; @@ -602,6 +602,11 @@ TaskCompletionEvent.Status.FAILED){ LOG.info(event.toString()); displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp()); + } + break; + case KILLED: + if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){ + LOG.info(event.toString()); } break; case ALL: Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=581502&r1=581501&r2=581502&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Oct 2 23:02:35 2007 @@ -381,6 +381,7 @@ TaskCompletionEvent taskEvent = null; if (state == TaskStatus.State.SUCCEEDED) { + boolean complete = false; taskEvent = new TaskCompletionEvent( taskCompletionEventTracker, status.getTaskId(), @@ -388,9 +389,9 @@ status.getIsMap(), TaskCompletionEvent.Status.SUCCEEDED, httpTaskLogLocation - ); + ); try { - completedTask(tip, status, metrics); + complete = completedTask(tip, status, metrics); } catch (IOException ioe) { // Oops! Failed to copy the task's output to its final place; // fail the task! @@ -405,7 +406,12 @@ " with: " + StringUtils.stringifyException(ioe)); return; } - tip.setSuccessEventNumber(taskCompletionEventTracker); + + if (complete) { + tip.setSuccessEventNumber(taskCompletionEventTracker); + } else { + taskEvent.setTaskStatus(TaskCompletionEvent.Status.KILLED); + } } else if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) { // Get the event number for the (possibly) previously successful @@ -424,7 +430,9 @@ // Did the task failure lead to tip failure? TaskCompletionEvent.Status taskCompletionStatus = - TaskCompletionEvent.Status.FAILED; + (state == TaskStatus.State.FAILED) ? + TaskCompletionEvent.Status.FAILED : + TaskCompletionEvent.Status.KILLED; if (tip.isFailed()) { taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED; } @@ -751,7 +759,7 @@ /** * A taskid assigned to this JobInProgress has reported in successfully. */ - public synchronized void completedTask(TaskInProgress tip, + public synchronized boolean completedTask(TaskInProgress tip, TaskStatus status, JobTrackerMetrics metrics) throws IOException { @@ -766,7 +774,7 @@ if (this.status.getRunState() != JobStatus.RUNNING) { jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); } - return; + return false; } LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + @@ -818,7 +826,10 @@ // The job has been killed/failed, // JobTracker should cleanup this task jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); + return false; } + + return true; } /** Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=581502&r1=581501&r2=581502&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Oct 2 23:02:35 2007 @@ -452,11 +452,17 @@ new ArrayList<MapOutputLocation>(); /** - * a TreeSet for needed map outputs + * The set of required map outputs */ private Set <Integer> neededOutputs = Collections.synchronizedSet(new TreeSet<Integer>()); + /** + * The set of obsolete map taskids. + */ + private Set <String> obsoleteMapIds = + Collections.synchronizedSet(new TreeSet<String>()); + private Random random = null; /** @@ -635,6 +641,9 @@ loc.getMapTaskId() + " from " + loc.getHost()); LOG.warn(StringUtils.stringifyException(e)); shuffleClientMetrics.failedFetch(); + + // Reset + size = -1; } finally { shuffleClientMetrics.threadFree(); finish(size); @@ -648,7 +657,7 @@ } } - /** Copies a a map output from a remote host, using raw RPC. + /** Copies a a map output from a remote host, via HTTP. * @param currentLocation the map output location to be copied * @return the path (fully qualified) of the copied file * @throws IOException if there is an error copying the file @@ -656,9 +665,12 @@ */ private long copyOutput(MapOutputLocation loc ) throws IOException, InterruptedException { - if (!neededOutputs.contains(loc.getMapId())) { + // check if we still need to copy the output from this location + if (!neededOutputs.contains(loc.getMapId()) || + obsoleteMapIds.contains(loc.getMapTaskId())) { return CopyResult.OBSOLETE; } + String reduceId = reduceTask.getTaskId(); LOG.info(reduceId + " Copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."); @@ -865,18 +877,21 @@ // tasktracker and put the mapId hashkeys with new // MapOutputLocations as values knownOutputs.addAll(retryFetches); - // The call getsMapCompletionEvents will modify fromEventId to a val - // that it should be for the next call to getSuccessMapEvents - List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId); + + // The call getMapCompletionEvents will update fromEventId to + // used for the next call to getMapCompletionEvents + int currentNumKnownMaps = knownOutputs.size(); + int currentNumObsoleteMapIds = obsoleteMapIds.size(); + getMapCompletionEvents(fromEventId, knownOutputs); + + LOG.info(reduceTask.getTaskId() + ": " + + "Got " + (knownOutputs.size()-currentNumKnownMaps) + + " new map-outputs & " + + (obsoleteMapIds.size()-currentNumObsoleteMapIds) + + " obsolete map-outputs from tasktracker and " + + retryFetches.size() + " map-outputs from previous failures" + ); - // put discovered them on the known list - for (int i=0; i < locs.size(); i++) { - knownOutputs.add(locs.get(i)); - } - LOG.info(reduceTask.getTaskId() + - " Got " + locs.size() + - " new map outputs from tasktracker and " + retryFetches.size() - + " map outputs from previous failures"); // clear the "failed" fetches hashmap retryFetches.clear(); } @@ -904,6 +919,13 @@ while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); + + // Do not schedule fetches from OBSOLETE maps + if (obsoleteMapIds.contains(loc.getMapTaskId())) { + locIt.remove(); + continue; + } + Long penaltyEnd = penaltyBox.get(loc.getHost()); boolean penalized = false, duplicate = false; @@ -1104,14 +1126,17 @@ } } - /** Queries the task tracker for a set of outputs ready to be copied + /** + * Queries the task tracker for a set of map-completion events from + * a given event ID. * @param fromEventId the first event ID we want to start from, this is - * modified by the call to this method + * modified by the call to this method * @param jobClient the job tracker * @return a set of locations to copy outputs from * @throws IOException */ - private List <MapOutputLocation> getMapCompletionEvents(IntWritable fromEventId) + private void getMapCompletionEvents(IntWritable fromEventId, + List<MapOutputLocation> knownOutputs) throws IOException { long currentTime = System.currentTimeMillis(); @@ -1122,31 +1147,54 @@ } catch (InterruptedException ie) { } // IGNORE currentTime = System.currentTimeMillis(); } - lastPollTime = currentTime; - TaskCompletionEvent t[] = umbilical.getMapCompletionEvents( - reduceTask.getJobId(), - fromEventId.get(), - probe_sample_size); - - List <MapOutputLocation> mapOutputsList = - new ArrayList<MapOutputLocation>(); - for (TaskCompletionEvent event : t) { - if (event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) { - URI u = URI.create(event.getTaskTrackerHttp()); - String host = u.getHost(); - int port = u.getPort(); - String taskId = event.getTaskId(); - int mId = event.idWithinJob(); - mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port)); - } else if (event.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED) { - neededOutputs.remove(event.idWithinJob()); - LOG.info("Ignoring output of failed map: '" + event.getTaskId() + "'"); + TaskCompletionEvent events[] = + umbilical.getMapCompletionEvents(reduceTask.getJobId(), + fromEventId.get(), probe_sample_size); + + // Note the last successful poll time-stamp + lastPollTime = currentTime; + + // Update the last seen event ID + fromEventId.set(fromEventId.get() + events.length); + + // Process the TaskCompletionEvents: + // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. + // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching + // from those maps. + // 3. Remove TIPFAILED maps from neededOutputs since we don't need their + // outputs at all. + for (TaskCompletionEvent event : events) { + switch (event.getTaskStatus()) { + case SUCCEEDED: + { + URI u = URI.create(event.getTaskTrackerHttp()); + String host = u.getHost(); + int port = u.getPort(); + String taskId = event.getTaskId(); + int mId = event.idWithinJob(); + knownOutputs.add(new MapOutputLocation(taskId, mId, host, port)); + } + break; + case FAILED: + case KILLED: + case OBSOLETE: + { + obsoleteMapIds.add(event.getTaskId()); + LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + + " map-task: '" + event.getTaskId() + "'"); + } + break; + case TIPFAILED: + { + neededOutputs.remove(event.idWithinJob()); + LOG.info("Ignoring output of failed map TIP: '" + + event.getTaskId() + "'"); + } + break; } } - - fromEventId.set(fromEventId.get() + t.length); - return mapOutputsList; + } Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=581502&r1=581501&r2=581502&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Oct 2 23:02:35 2007 @@ -29,7 +29,7 @@ * job tracker. */ public class TaskCompletionEvent implements Writable{ - static public enum Status {FAILED, SUCCEEDED, OBSOLETE, TIPFAILED}; + static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED}; private int eventId; private String taskTrackerHttp; Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=581502&r1=581501&r2=581502&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Oct 2 23:02:35 2007 @@ -513,14 +513,6 @@ private IntWritable fromEventId; /** This is the cache of map events for a given job */ private List<TaskCompletionEvent> allMapEvents; - /** This array will store indexes to "SUCCEEDED" map events from - * allMapEvents. The array is indexed by the mapId. - * The reason why we store the indexes is to quickly reset SUCCEEDED - * events to OBSOLETE. Thus ReduceTasks might also get to know about - * OBSOLETE events and avoid fetching map outputs from the corresponding - * locations. - */ - private int indexToEventsCache[]; /** What jobid this fetchstatus object is for*/ private String jobId; @@ -528,7 +520,6 @@ this.fromEventId = new IntWritable(0); this.jobId = jobId; this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps); - this.indexToEventsCache = new int[numMaps]; } public TaskCompletionEvent[] getMapEvents(int fromId, int max) { @@ -551,32 +542,7 @@ List <TaskCompletionEvent> recentMapEvents = queryJobTracker(fromEventId, jobId, jobClient); synchronized (allMapEvents) { - for (TaskCompletionEvent t : recentMapEvents) { - TaskCompletionEvent.Status status = t.getTaskStatus(); - allMapEvents.add(t); - - if (status == TaskCompletionEvent.Status.SUCCEEDED) { - //store the index of the events cache for this success event. - indexToEventsCache[t.idWithinJob()] = allMapEvents.size(); - } - else if (status == TaskCompletionEvent.Status.FAILED || - status == TaskCompletionEvent.Status.OBSOLETE) { - int idx = indexToEventsCache[t.idWithinJob()]; - //if this map task was declared a success earlier, we will have - //idx > 0 - if (idx > 0) { - //Mark the event as OBSOLETE and reset the index to 0. Note - //we access the 'idx - 1' entry. This is because while storing - //the idx in indexToEventsCache, we store the 'actual idx + 1' - //Helps us to eliminate the index array elements initialization - //to something like '-1' - TaskCompletionEvent obsoleteEvent = allMapEvents.get(idx - 1); - obsoleteEvent.setTaskStatus( - TaskCompletionEvent.Status.OBSOLETE); - indexToEventsCache[t.idWithinJob()] = 0; - } - } - } + allMapEvents.addAll(recentMapEvents); } } }