Author: cutting Date: Thu May 3 12:51:14 2007 New Revision: 534975 URL: http://svn.apache.org/viewvc?view=rev&rev=534975 Log: HADOOP-1144. Permit one to specify the maximum percentage of tasks that can fail before a job is aborted. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534975&r1=534974&r2=534975 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu May 3 12:51:14 2007 @@ -325,6 +325,10 @@ 96. HADOOP-1322. Fix TaskTracker blacklisting to work correctly in one- and two-node clusters. (Arun C Murthy via cutting) +97. HADOOP-1144. Permit one to specify a maximum percentage of tasks + that can fail before a job is aborted. The default is zero. + (Arun C Murthy via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=534975&r1=534974&r2=534975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu May 3 12:51:14 2007 @@ -613,6 +613,50 @@ public int getMaxTaskFailuresPerTracker() { return getInt("mapred.max.tracker.failures", 4); } + + /** + * Get the maximum percentage of map tasks that can fail without + * the job being aborted. + * + * @return the maximum percentage of map tasks that can fail without + * the job being aborted + */ + public int getMaxMapTaskFailuresPercent() { + return getInt("mapred.max.map.failures.percent", 0); + } + + /** + * Set the maximum percentage of map tasks that can fail without the job + * being aborted. + * + * @param percent the maximum percentage of map tasks that can fail without + * the job being aborted + */ + public void setMaxMapTaskFailuresPercent(int percent) { + setInt("mapred.max.map.failures.percent", percent); + } + + /** + * Get the maximum percentage of reduce tasks that can fail without + * the job being aborted. + * + * @return the maximum percentage of reduce tasks that can fail without + * the job being aborted + */ + public int getMaxReduceTaskFailuresPercent() { + return getInt("mapred.max.reduce.failures.percent", 0); + } + + /** + * Set the maximum percentage of reduce tasks that can fail without the job + * being aborted. + * + * @param percent the maximum percentage of reduce tasks that can fail without + * the job being aborted + */ + public void setMaxReduceTaskFailuresPercent(int percent) { + setInt("mapred.max.reduce.failures.percent", percent); + } /** Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=534975&r1=534974&r2=534975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu May 3 12:51:14 2007 @@ -63,7 +63,13 @@ int finishedMapTasks = 0; int finishedReduceTasks = 0; int failedMapTasks = 0; - int failedReduceTasks = 0; + int failedReduceTasks = 0; + + int mapFailuresPercent = 0; + int reduceFailuresPercent = 0; + int failedMapTIPs = 0; + int failedReduceTIPs = 0; + JobTracker jobtracker = null; Map<String,List<TaskInProgress>> hostToMaps = new HashMap<String,List<TaskInProgress>>(); @@ -91,7 +97,14 @@ private LocalFileSystem localFs; private String uniqueString; - + + // Per-job counters + public static enum Counter { + NUM_FAILED_MAPS, + NUM_FAILED_REDUCES + } + private Counters jobCounters = new Counters(); + private Counters mapCounters = new Counters(); private Counters reduceCounters = new Counters(); private MetricsRecord jobMetrics; @@ -130,6 +143,9 @@ this.numReduceTasks = conf.getNumReduceTasks(); this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>( numMapTasks + numReduceTasks + 10); + + this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent(); + this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent(); JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), System.currentTimeMillis(), jobFile); @@ -359,14 +375,6 @@ tip.setSuccessEventNumber(taskCompletionEventTracker); } else if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) { - taskEvent = new TaskCompletionEvent( - taskCompletionEventTracker, - status.getTaskId(), - tip.idWithinJob(), - status.getIsMap(), - TaskCompletionEvent.Status.FAILED, - httpTaskLogLocation - ); // Get the event number for the (possibly) previously successful // task. If there exists one, then set that status to OBSOLETE int eventNumber; @@ -376,9 +384,24 @@ if (t.getTaskId().equals(status.getTaskId())) t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE); } + // Tell the job to fail the relevant task failedTask(tip, status.getTaskId(), status, status.getTaskTracker(), - wasRunning, wasComplete); + wasRunning, wasComplete, metrics); + + // Did the task failure lead to tip failure? + TaskCompletionEvent.Status taskCompletionStatus = + TaskCompletionEvent.Status.FAILED; + if (tip.isFailed()) { + taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED; + } + taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, + status.getTaskId(), + tip.idWithinJob(), + status.getIsMap(), + taskCompletionStatus, + httpTaskLogLocation + ); } // Add the 'complete' task i.e. successful/failed @@ -411,7 +434,16 @@ } } } - + + /** + * Returns the job-level counters. + * + * @return the job-level counters. + */ + public synchronized Counters getJobCounters() { + return jobCounters; + } + /** * Returns map phase counters by summing over all map tasks in progress. */ @@ -427,11 +459,12 @@ } /** - * Returns the total job counters, by adding together the map and the - * reduce counters. + * Returns the total job counters, by adding together the job, + * the map and the reduce counters. */ public Counters getCounters() { - return Counters.sum(getMapCounters(), getReduceCounters()); + return Counters.sum(getJobCounters(), + Counters.sum(getMapCounters(), getReduceCounters())); } /** @@ -741,9 +774,27 @@ // // Figure out whether the Job is done // + isJobComplete(tip, metrics); + + if (this.status.getRunState() != JobStatus.RUNNING) { + // The job has been killed/failed, + // JobTracker should cleanup this task + jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); + } + } + + /** + * Check if the job is done since all it's component tasks are either + * successful or have failed. + * + * @param tip the current tip which completed either succesfully or failed + * @param metrics job-tracker metrics + * @return + */ + private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) { boolean allDone = true; for (int i = 0; i < maps.length; i++) { - if (!maps[i].isComplete()) { + if (!(maps[i].isComplete() || maps[i].isFailed())) { allDone = false; break; } @@ -753,7 +804,7 @@ this.status.setMapProgress(1.0f); } for (int i = 0; i < reduces.length; i++) { - if (!reduces[i].isComplete()) { + if (!(reduces[i].isComplete() || reduces[i].isFailed())) { allDone = false; break; } @@ -773,13 +824,11 @@ JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks); metrics.completeJob(); - } else if (this.status.getRunState() != JobStatus.RUNNING) { - // The job has been killed/failed, - // JobTracker should cleanup this task - jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); + return true; } + + return false; } - /** * Kill the job and all its component tasks. */ @@ -809,7 +858,7 @@ * A task assigned to this JobInProgress has reported in as failed. * Most of the time, we'll just reschedule execution. However, after * many repeated failures we may instead decide to allow the entire - * job to fail. + * job to fail or succeed if the user doesn't care about a few tasks failing. * * Even if a task has reported as completed in the past, it might later * be reported as failed. That's because the TaskTracker that hosts a map @@ -819,7 +868,8 @@ */ private void failedTask(TaskInProgress tip, String taskid, TaskStatus status, String trackerName, - boolean wasRunning, boolean wasComplete) { + boolean wasRunning, boolean wasComplete, + JobTrackerMetrics metrics) { // Mark the taskid as a 'failure' tip.incompleteSubTask(taskid, trackerName); @@ -881,16 +931,45 @@ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); // - // Check if we need to kill the job because of too many failures + // Check if we need to kill the job because of too many failures or + // if the job is complete since all component tasks have completed // if (tip.isFailed()) { - LOG.info("Aborting job " + profile.getJobId()); - JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), - tip.isMapTask() ? Values.MAP.name():Values.REDUCE.name(), - System.currentTimeMillis(), status.getDiagnosticInfo()); - JobHistory.JobInfo.logFailed(profile.getJobId(), - System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks); - kill(); + // + // Allow upto 'mapFailuresPercent' of map tasks to fail or + // 'reduceFailuresPercent' of reduce tasks to fail + // + boolean killJob = + tip.isMapTask() ? + (((++failedMapTIPs*100)/numMapTasks) > mapFailuresPercent) : + (((++failedReduceTIPs*100)/numReduceTasks) > reduceFailuresPercent); + + if (killJob) { + LOG.info("Aborting job " + profile.getJobId()); + JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), + tip.isMapTask() ? + Values.MAP.name() : + Values.REDUCE.name(), + System.currentTimeMillis(), + status.getDiagnosticInfo()); + JobHistory.JobInfo.logFailed(profile.getJobId(), + System.currentTimeMillis(), + this.finishedMapTasks, + this.finishedReduceTasks + ); + kill(); + } else { + isJobComplete(tip, metrics); + } + + // + // Update the counters + // + if (tip.isMapTask()) { + jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1); + } else { + jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1); + } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=534975&r1=534974&r2=534975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu May 3 12:51:14 2007 @@ -822,43 +822,39 @@ copyProgress.start(); try { // loop until we get all required outputs - while (numCopied < numOutputs && mergeThrowable == null) { + while (!neededOutputs.isEmpty() && mergeThrowable == null) { - LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) + - " map output(s)"); + LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() + + " map output(s)"); - if (!neededOutputs.isEmpty()) { - LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() + - " map output location(s)"); - try { - // Put the hash entries for the failed fetches. Entries here - // might be replaced by (mapId) hashkeys from new successful - // Map executions, if the fetch failures were due to lost tasks. - // The replacements, if at all, will happen when we query the - // tasktracker and put the mapId hashkeys with new - // MapOutputLocations as values - knownOutputs.putAll(retryFetches); - // The call getSuccessMapEvents will modify fromEventId to a val - // that it should be for the next call to getSuccessMapEvents - List <MapOutputLocation> locs = getSuccessMapEvents(fromEventId); - - // put discovered them on the known list - for (int i=0; i < locs.size(); i++) { - knownOutputs.put(new Integer(locs.get(i).getMapId()), - locs.get(i)); - } - LOG.info(reduceTask.getTaskId() + - " Got " + locs.size() + - " new map outputs from tasktracker and " + retryFetches.size() - + " map outputs from previous failures"); - // clear the "failed" fetches hashmap - retryFetches.clear(); - } - catch (IOException ie) { - LOG.warn(reduceTask.getTaskId() + - " Problem locating map outputs: " + - StringUtils.stringifyException(ie)); + try { + // Put the hash entries for the failed fetches. Entries here + // might be replaced by (mapId) hashkeys from new successful + // Map executions, if the fetch failures were due to lost tasks. + // The replacements, if at all, will happen when we query the + // tasktracker and put the mapId hashkeys with new + // MapOutputLocations as values + knownOutputs.putAll(retryFetches); + // The call getsMapCompletionEvents will modify fromEventId to a val + // that it should be for the next call to getSuccessMapEvents + List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId); + + // put discovered them on the known list + for (int i=0; i < locs.size(); i++) { + knownOutputs.put(new Integer(locs.get(i).getMapId()), + locs.get(i)); } + LOG.info(reduceTask.getTaskId() + + " Got " + locs.size() + + " new map outputs from tasktracker and " + retryFetches.size() + + " map outputs from previous failures"); + // clear the "failed" fetches hashmap + retryFetches.clear(); + } + catch (IOException ie) { + LOG.warn(reduceTask.getTaskId() + + " Problem locating map outputs: " + + StringUtils.stringifyException(ie)); } // now walk through the cache and schedule what we can @@ -1009,7 +1005,7 @@ if (inMemClosedFiles.length == 0) { LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + inMemFileSys.getUri()); - return numCopied == numOutputs; + return neededOutputs.isEmpty(); } //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to be @@ -1047,7 +1043,7 @@ return false; } } - return mergeThrowable == null && numCopied == numOutputs; + return mergeThrowable == null && neededOutputs.isEmpty(); } finally { inMemFileSys.close(); copyProgress.interrupt(); @@ -1077,7 +1073,7 @@ * @return a set of locations to copy outputs from * @throws IOException */ - private List <MapOutputLocation> getSuccessMapEvents(IntWritable fromEventId) + private List <MapOutputLocation> getMapCompletionEvents(IntWritable fromEventId) throws IOException { long currentTime = System.currentTimeMillis(); @@ -1097,14 +1093,17 @@ List <MapOutputLocation> mapOutputsList = new ArrayList<MapOutputLocation>(); - for (int i = 0; i < t.length; i++) { - if (t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) { - URI u = URI.create(t[i].getTaskTrackerHttp()); + for (TaskCompletionEvent event : t) { + if (event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) { + URI u = URI.create(event.getTaskTrackerHttp()); String host = u.getHost(); int port = u.getPort(); - String taskId = t[i].getTaskId(); - int mId = t[i].idWithinJob(); + String taskId = event.getTaskId(); + int mId = event.idWithinJob(); mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port)); + } else if (event.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED) { + neededOutputs.remove(event.idWithinJob()); + LOG.info("Ignoring output of failed map: '" + event.getTaskId() + "'"); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=diff&rev=534975&r1=534974&r2=534975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Thu May 3 12:51:14 2007 @@ -12,7 +12,7 @@ * */ public class TaskCompletionEvent implements Writable{ - static public enum Status {FAILED, SUCCEEDED, OBSOLETE}; + static public enum Status {FAILED, SUCCEEDED, OBSOLETE, TIPFAILED}; private int eventId; private String taskTrackerHttp; Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=534975&r1=534974&r2=534975 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu May 3 12:51:14 2007 @@ -160,7 +160,7 @@ <% Counters mapCounters = job.getMapCounters(); Counters reduceCounters = job.getReduceCounters(); - Counters totalCounters = Counters.sum(mapCounters,reduceCounters); + Counters totalCounters = job.getCounters(); for (String groupName : totalCounters.getGroupNames()) { Counters.Group totalGroup = totalCounters.getGroup(groupName);