Author: cutting Date: Fri Jun 15 14:29:48 2007 New Revision: 547790 URL: http://svn.apache.org/viewvc?view=rev&rev=547790 Log: HADOOP-1472. Fix so that timed-out tasks are counted as failures rather than as killed. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547790&r1=547789&r2=547790 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 15 14:29:48 2007 @@ -130,6 +130,9 @@ 41. HADOOP-1457. Add counters for monitoring task assignments. (Arun C Murthy via tomwhite) + 42. HADOOP-1472. Fix so that timed-out tasks are counted as failures + rather than as killed. (Arun C Murthy via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=547790&r1=547789&r2=547790 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Jun 15 14:29:48 2007 @@ -215,7 +215,7 @@ } LOG.info("Received KillTaskAction for task: " + killAction.getTaskId()); - purgeTask(tip); + purgeTask(tip, false); } else { LOG.error("Non-delete action given to cleanup thread: " + action); @@ -623,7 +623,7 @@ new TreeMap<String, TaskInProgress>(); tasksToClose.putAll(tasks); for (TaskInProgress tip : tasksToClose.values()) { - tip.jobHasFinished(); + tip.jobHasFinished(false); } // Shutdown local RPC servers. Do them @@ -920,13 +920,13 @@ // time-period greater than the configured time-out long timeSinceLastReport = now - tip.getLastProgressReport(); if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) { - String msg = "Task failed to report status for " + - (timeSinceLastReport / 1000) + - " seconds. Killing."; + String msg = + "Task " + tip.getTask().getTaskId() + " failed to report status for " + + (timeSinceLastReport / 1000) + " seconds. Killing!"; LOG.info(tip.getTask().getTaskId() + ": " + msg); ReflectionUtils.logThreadInfo(LOG, "lost task", 30); tip.reportDiagnosticInfo(msg); - purgeTask(tip); + purgeTask(tip, true); } } } @@ -951,7 +951,7 @@ synchronized (rjob) { // Add this tips of this job to queue of tasks to be purged for (TaskInProgress tip : rjob.tasks) { - tip.jobHasFinished(); + tip.jobHasFinished(false); } // Delete the job directory for this // task if the job is done/failed @@ -974,17 +974,17 @@ * Remove the tip and update all relevant state. * * @param tip [EMAIL PROTECTED] TaskInProgress} to be removed. - * @param purgeJobFiles <code>true</code> if the job files are to be - * purged, <code>false</code> otherwise. + * @param wasFailure did the task fail or was it killed? */ - private void purgeTask(TaskInProgress tip) throws IOException { + private void purgeTask(TaskInProgress tip, boolean wasFailure) + throws IOException { if (tip != null) { LOG.info("About to purge task: " + tip.getTask().getTaskId()); // Remove the task from running jobs, // removing the job if it's the last task removeTaskFromJob(tip.getTask().getJobId(), tip); - tip.jobHasFinished(); + tip.jobHasFinished(wasFailure); } } @@ -1008,7 +1008,7 @@ " Killing task."; LOG.info(killMe.getTask().getTaskId() + ": " + msg); killMe.reportDiagnosticInfo(msg); - purgeTask(killMe); + purgeTask(killMe, false); } } } @@ -1105,7 +1105,7 @@ LOG.warn(msg); tip.reportDiagnosticInfo(msg); try { - tip.killAndCleanup(true); + tip.kill(true); } catch (IOException ie2) { LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" + StringUtils.stringifyException(ie2)); @@ -1187,7 +1187,6 @@ private boolean keepFailedTaskFiles; private boolean alwaysKeepTaskFiles; private TaskStatus taskStatus; - private boolean keepJobFiles; private long taskTimeout; /** @@ -1207,7 +1206,6 @@ getName(), task.isMapTask()? TaskStatus.Phase.MAP: TaskStatus.Phase.SHUFFLE, task.getCounters()); - keepJobFiles = false; taskTimeout = (10 * 60 * 1000); } @@ -1236,7 +1234,6 @@ task.setConf(localJobConf); String keepPattern = localJobConf.getKeepTaskFilesPattern(); if (keepPattern != null) { - keepJobFiles = true; alwaysKeepTaskFiles = Pattern.matches(keepPattern, task.getTaskId()); } else { @@ -1408,50 +1405,36 @@ /** * We no longer need anything from this task, as the job has - * finished. If the task is still running, kill it (and clean up + * finished. If the task is still running, kill it and clean up. + * + * @param wasFailure did the task fail, as opposed to was it killed by + * the framework */ - public void jobHasFinished() throws IOException { - boolean killTask = false; + public void jobHasFinished(boolean wasFailure) throws IOException { + // Kill the task if it is still running synchronized(this){ - killTask = (getRunState() == TaskStatus.State.RUNNING); - if (killTask) { - killAndCleanup(false); - } - } - if (!killTask) { - cleanup(); - } - if (keepJobFiles) - return; - - synchronized(this){ - // Delete temp directory in case any task used PhasedFileSystem. - try{ - String systemDir = task.getConf().get("mapred.system.dir"); - Path taskTempDir = new Path(systemDir + "/" + - task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId()); - if (fs.exists(taskTempDir)){ - fs.delete(taskTempDir); - } - }catch(IOException e){ - LOG.warn("Error in deleting reduce temporary output", e); + if (getRunState() == TaskStatus.State.RUNNING) { + kill(wasFailure); } } + + // Cleanup on the finished task + cleanup(); } /** * Something went wrong and the task must be killed. * @param wasFailure was it a failure (versus a kill request)? */ - public synchronized void killAndCleanup(boolean wasFailure - ) throws IOException { + public synchronized void kill(boolean wasFailure) throws IOException { if (runstate == TaskStatus.State.RUNNING) { wasKilled = true; if (wasFailure) { failures += 1; } runner.kill(); - runstate = TaskStatus.State.KILLED; + runstate = + (wasFailure) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED; } else if (runstate == TaskStatus.State.UNASSIGNED) { if (wasFailure) { failures += 1; @@ -1596,7 +1579,7 @@ LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message); TaskInProgress tip = runningTasks.get(taskId); tip.reportDiagnosticInfo("FSError: " + message); - purgeTask(tip); + purgeTask(tip, true); } public TaskCompletionEvent[] getMapCompletionEvents(