Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=741192&r1=741191&r2=741192&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Thu Feb 5 17:24:11 2009 @@ -41,7 +41,7 @@ // what state is the task in? public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, - COMMIT_PENDING} + COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN} private final TaskAttemptID taskid; private float progress; @@ -204,6 +204,12 @@ } this.phase = phase; } + + boolean inTaskCleanupPhase() { + return (this.phase == TaskStatus.Phase.CLEANUP && + (this.runState == TaskStatus.State.FAILED_UNCLEAN || + this.runState == TaskStatus.State.KILLED_UNCLEAN)); + } public boolean getIncludeCounters() { return includeCounters; @@ -261,9 +267,9 @@ /** * Update the status of the task. * + * @param runstate * @param progress * @param state - * @param phase * @param counters */ synchronized void statusUpdate(State runState, @@ -300,7 +306,33 @@ this.counters = status.getCounters(); this.outputSize = status.outputSize; } - + + /** + * Update specific fields of task status + * + * This update is done in JobTracker when a cleanup attempt of task + * reports its status. Then update only specific fields, not all. + * + * @param runState + * @param progress + * @param state + * @param phase + * @param finishTime + */ + synchronized void statusUpdate(State runState, + float progress, + String state, + Phase phase, + long finishTime) { + setRunState(runState); + setProgress(progress); + setStateString(state); + setPhase(phase); + if (finishTime != 0) { + this.finishTime = finishTime; + } + } + /** * Clear out transient information after sending out a status-update * from either the {...@link Task} to the {...@link TaskTracker} or from the
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=741192&r1=741191&r2=741192&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Feb 5 17:24:11 2009 @@ -183,7 +183,8 @@ private static final String SUBDIR = "taskTracker"; private static final String CACHEDIR = "archive"; private static final String JOBCACHE = "jobcache"; - private static final String PIDDIR = "pids"; + private static final String PID = "pid"; + private static final String OUTPUT = "output"; private JobConf originalConf; private JobConf fConf; private int maxCurrentMapTasks; @@ -412,38 +413,63 @@ static String getJobCacheSubdir() { return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE; } + + static String getLocalJobDir(String jobid) { + return getJobCacheSubdir() + Path.SEPARATOR + jobid; + } - static String getPidFilesSubdir() { - return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR; + static String getLocalTaskDir(String jobid, String taskid) { + return getLocalTaskDir(jobid, taskid, false) ; } - + + static String getIntermediateOutputDir(String jobid, String taskid) { + return getLocalTaskDir(jobid, taskid) + + Path.SEPARATOR + TaskTracker.OUTPUT ; + } + + static String getLocalTaskDir(String jobid, + String taskid, + boolean isCleanupAttempt) { + String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid; + if (isCleanupAttempt) { + taskDir = taskDir + ".cleanup"; + } + return taskDir; + } + + static String getPidFile(String jobid, + String taskid, + boolean isCleanup) { + return getLocalTaskDir(jobid, taskid, isCleanup) + + Path.SEPARATOR + PID; + } + /** * Get the pidFile path of a Task + * * @param tid the TaskAttemptID of the task for which pidFile's path is needed + * @param conf Configuration for local dir allocator + * @param isCleanup true if the task is cleanup attempt + * * @return pidFile's Path */ - public static Path getPidFilePath(TaskAttemptID tid, JobConf conf) { + static Path getPidFilePath(TaskAttemptID tid, + JobConf conf, + boolean isCleanup) { Path pidFileName = null; try { //this actually need not use a localdirAllocator since the PID //files are really small.. pidFileName = lDirAlloc.getLocalPathToRead( - (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tid), - conf); + getPidFile(tid.getJobID().toString(), tid.toString(), isCleanup), + conf); } catch (IOException i) { // PID file is not there - LOG.warn("Failed to get pidFile name for " + tid + " " + i); + LOG.warn("Failed to get pidFile name for " + tid + " " + + StringUtils.stringifyException(i)); } return pidFileName; } - public void removePidFile(TaskAttemptID tid) { - Path pidFilePath = getPidFilePath(tid, getJobConf()); - if (pidFilePath != null) { - try { - FileSystem.getLocal(getJobConf()).delete(pidFilePath, false); - } catch(IOException ie) {} - } - } public long getProtocolVersion(String protocol, long clientVersion) throws IOException { @@ -785,9 +811,9 @@ } catch(FileNotFoundException fe) { jobFileSize = -1; } - Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir() - + Path.SEPARATOR + jobId - + Path.SEPARATOR + "job.xml"), + Path localJobFile = lDirAlloc.getLocalPathForWrite( + getLocalJobDir(jobId.toString()) + + Path.SEPARATOR + "job.xml", jobFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, tip); synchronized (rjob) { @@ -811,9 +837,9 @@ // create the 'work' directory // job-specific shared directory for use as scratch space - Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir() - + Path.SEPARATOR + jobId - + Path.SEPARATOR + "work"), fConf); + Path workDir = lDirAlloc.getLocalPathForWrite( + (getLocalJobDir(jobId.toString()) + + Path.SEPARATOR + "work"), fConf); if (!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); @@ -835,8 +861,7 @@ // Here we check for and we check five times the size of jarFileSize // to accommodate for unjarring the jar file in work directory localJarFile = new Path(lDirAlloc.getLocalPathForWrite( - getJobCacheSubdir() - + Path.SEPARATOR + jobId + getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "jars", 5 * jarFileSize, fConf), "job.jar"); if (!localFs.mkdirs(localJarFile.getParent())) { @@ -1253,7 +1278,8 @@ for (TaskStatus taskStatus : status.getTaskReports()) { if (taskStatus.getRunState() != TaskStatus.State.RUNNING && taskStatus.getRunState() != TaskStatus.State.UNASSIGNED && - taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) { + taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && + !taskStatus.inTaskCleanupPhase()) { if (taskStatus.getIsMap()) { mapTotal--; } else { @@ -1370,7 +1396,8 @@ long now = System.currentTimeMillis(); for (TaskInProgress tip: runningTasks.values()) { if (tip.getRunState() == TaskStatus.State.RUNNING || - tip.getRunState() == TaskStatus.State.COMMIT_PENDING) { + tip.getRunState() == TaskStatus.State.COMMIT_PENDING || + tip.isCleaningup()) { // Check the per-job timeout interval for tasks; // an interval of '0' implies it is never timed-out long jobTaskTimeout = tip.getTaskTimeout(); @@ -1424,8 +1451,7 @@ // task if the job is done/failed if (!rjob.keepJobFiles){ directoryCleanupThread.addToQueue(getLocalFiles(fConf, - SUBDIR + Path.SEPARATOR + JOBCACHE + - Path.SEPARATOR + rjob.getJobID())); + getLocalJobDir(rjob.getJobID().toString()))); } // Remove this job rjob.tasks.clear(); @@ -1684,7 +1710,9 @@ } synchronized (tip) { //to make sure that there is no kill task action for this - if (tip.getRunState() != TaskStatus.State.UNASSIGNED) { + if (tip.getRunState() != TaskStatus.State.UNASSIGNED && + tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN && + tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) { //got killed externally while still in the launcher queue addFreeSlot(); continue; @@ -1705,7 +1733,8 @@ private TaskInProgress registerTask(LaunchTaskAction action, TaskLauncher launcher) { Task t = action.getTask(); - LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID()); + LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() + + " task's state:" + t.getState()); TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher); synchronized (this) { tasks.put(t.getTaskID(), tip); @@ -1727,10 +1756,6 @@ private void startNewTask(TaskInProgress tip) { try { localizeJob(tip); - if (isTaskMemoryManagerEnabled()) { - taskMemoryManager.addTask(tip.getTask().getTaskID(), - getVirtualMemoryForTask(tip.getJobConf())); - } } catch (Throwable e) { String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils.stringifyException(e)); @@ -1751,7 +1776,23 @@ } } } - + + void addToMemoryManager(TaskAttemptID attemptId, + JobConf conf, + String pidFile) { + if (isTaskMemoryManagerEnabled()) { + taskMemoryManager.addTask(attemptId, + getVirtualMemoryForTask(conf), pidFile); + } + } + + void removeFromMemoryManager(TaskAttemptID attemptId) { + // Remove the entry from taskMemoryManagerThread's data structures. + if (isTaskMemoryManagerEnabled()) { + taskMemoryManager.removeTask(attemptId); + } + } + /** * The server retry loop. * This while-loop attempts to connect to the JobTracker. It only @@ -1838,10 +1879,12 @@ localJobConf = null; taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 0.0f, - TaskStatus.State.UNASSIGNED, + task.getState(), diagnosticInfo.toString(), "initializing", getName(), + task.isTaskCleanupTask() ? + TaskStatus.Phase.CLEANUP : task.isMapTask()? TaskStatus.Phase.MAP: TaskStatus.Phase.SHUFFLE, task.getCounters()); @@ -1851,9 +1894,10 @@ private void localizeTask(Task task) throws IOException{ Path localTaskDir = - lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + - Path.SEPARATOR + task.getJobID() + Path.SEPARATOR + - task.getTaskID()), defaultJobConf ); + lDirAlloc.getLocalPathForWrite( + TaskTracker.getLocalTaskDir(task.getJobID().toString(), + task.getTaskID().toString(), task.isTaskCleanupTask()), + defaultJobConf ); FileSystem localFs = FileSystem.getLocal(fConf); if (!localFs.mkdirs(localTaskDir)) { @@ -1863,8 +1907,7 @@ // create symlink for ../work if it already doesnt exist String workDir = lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() - + Path.SEPARATOR + task.getJobID() + TaskTracker.getLocalJobDir(task.getJobID().toString()) + Path.SEPARATOR + "work", defaultJobConf).toString(); String link = localTaskDir.getParent().toString() @@ -1875,11 +1918,10 @@ // create the working-directory of the task Path cwd = lDirAlloc.getLocalPathForWrite( - TaskTracker.getJobCacheSubdir() - + Path.SEPARATOR + task.getJobID() - + Path.SEPARATOR + task.getTaskID() - + Path.SEPARATOR + MRConstants.WORKDIR, - defaultJobConf); + getLocalTaskDir(task.getJobID().toString(), + task.getTaskID().toString(), task.isTaskCleanupTask()) + + Path.SEPARATOR + MRConstants.WORKDIR, + defaultJobConf); if (!localFs.mkdirs(cwd)) { throw new IOException("Mkdirs failed to create " + cwd.toString()); @@ -1974,9 +2016,13 @@ * Kick off the task execution */ public synchronized void launchTask() throws IOException { - if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { + if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED || + this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || + this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) { localizeTask(task); - this.taskStatus.setRunState(TaskStatus.State.RUNNING); + if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { + this.taskStatus.setRunState(TaskStatus.State.RUNNING); + } this.runner = task.createRunner(TaskTracker.this, this); this.runner.start(); this.taskStatus.setStartTime(System.currentTimeMillis()); @@ -1986,6 +2032,10 @@ } } + boolean isCleaningup() { + return this.taskStatus.inTaskCleanupPhase(); + } + /** * The task is reporting its progress */ @@ -1993,10 +2043,14 @@ { LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% " + taskStatus.getStateString()); - + // task will report its state as + // COMMIT_PENDING when it is waiting for commit response and + // when it is committing. + // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN if (this.done || (this.taskStatus.getRunState() != TaskStatus.State.RUNNING && - this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) { + this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && + !isCleaningup())) { //make sure we ignore progress messages after a task has //invoked TaskUmbilicalProtocol.done() or if the task has been //KILLED/FAILED @@ -2047,7 +2101,16 @@ * The task is reporting that it's done running */ public synchronized void reportDone() { - this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + if (isCleaningup()) { + if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { + this.taskStatus.setRunState(TaskStatus.State.FAILED); + } else if (this.taskStatus.getRunState() == + TaskStatus.State.KILLED_UNCLEAN) { + this.taskStatus.setRunState(TaskStatus.State.KILLED); + } + } else { + this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + } this.taskStatus.setProgress(1.0f); this.taskStatus.setFinishTime(System.currentTimeMillis()); this.done = true; @@ -2062,6 +2125,11 @@ return wasKilled; } + void reportTaskFinished() { + taskFinished(); + releaseSlot(); + } + /** * The task has actually finished running. */ @@ -2088,7 +2156,23 @@ if (!done) { if (!wasKilled) { failures += 1; - taskStatus.setRunState(TaskStatus.State.FAILED); + /* State changes: + * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED + * FAILED_UNCLEAN -> FAILED + * KILLED_UNCLEAN -> KILLED + */ + if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.FAILED); + } else if (taskStatus.getRunState() == + TaskStatus.State.KILLED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.KILLED); + } else if (task.isMapOrReduce() && + taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) { + taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN); + } else { + taskStatus.setRunState(TaskStatus.State.FAILED); + } + removeFromMemoryManager(task.getTaskID()); // call the script here for the failed tasks. if (debugCommand != null) { String taskStdout =""; @@ -2114,9 +2198,10 @@ File workDir = null; try { workDir = new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() - + Path.SEPARATOR + task.getJobID() - + Path.SEPARATOR + task.getTaskID() + TaskTracker.getLocalTaskDir( + task.getJobID().toString(), + task.getTaskID().toString(), + task.isTaskCleanupTask()) + Path.SEPARATOR + MRConstants.WORKDIR, localJobConf). toString()); } catch (IOException e) { @@ -2169,14 +2254,14 @@ LOG.warn("Exception in add diagnostics!"); } } - } else { - taskStatus.setRunState(TaskStatus.State.KILLED); } taskStatus.setProgress(0.0f); } this.taskStatus.setFinishTime(System.currentTimeMillis()); needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || - taskStatus.getRunState() == TaskStatus.State.KILLED); + taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || + taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || + taskStatus.getRunState() == TaskStatus.State.KILLED); } // @@ -2286,7 +2371,8 @@ synchronized(this){ if (getRunState() == TaskStatus.State.RUNNING || getRunState() == TaskStatus.State.UNASSIGNED || - getRunState() == TaskStatus.State.COMMIT_PENDING) { + getRunState() == TaskStatus.State.COMMIT_PENDING || + isCleaningup()) { kill(wasFailure); } } @@ -2297,19 +2383,46 @@ /** * Something went wrong and the task must be killed. + * + * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN + * FAILED_UNCLEAN -> FAILED + * KILLED_UNCLEAN -> KILLED + * UNASSIGNED -> FAILED/KILLED * @param wasFailure was it a failure (versus a kill request)? */ public synchronized void kill(boolean wasFailure) throws IOException { + /* State changes: + * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED + * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN + * FAILED_UNCLEAN -> FAILED + * KILLED_UNCLEAN -> KILLED + * UNASSIGNED -> FAILED/KILLED + */ if (taskStatus.getRunState() == TaskStatus.State.RUNNING || - taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) { + taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || + isCleaningup()) { wasKilled = true; if (wasFailure) { failures += 1; } runner.kill(); - taskStatus.setRunState((wasFailure) ? - TaskStatus.State.FAILED : - TaskStatus.State.KILLED); + if (task.isMapOrReduce()) { + taskStatus.setRunState((wasFailure) ? + TaskStatus.State.FAILED_UNCLEAN : + TaskStatus.State.KILLED_UNCLEAN); + } else { + // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always + if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.FAILED); + } else if (taskStatus.getRunState() == + TaskStatus.State.KILLED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.KILLED); + } else { + taskStatus.setRunState((wasFailure) ? + TaskStatus.State.FAILED : + TaskStatus.State.KILLED); + } + } } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { if (wasFailure) { failures += 1; @@ -2318,6 +2431,7 @@ taskStatus.setRunState(TaskStatus.State.KILLED); } } + removeFromMemoryManager(task.getTaskID()); releaseSlot(); } @@ -2369,7 +2483,12 @@ synchronized (TaskTracker.this) { if (needCleanup) { - tasks.remove(taskId); + // see if tasks data structure is holding this tip. + // tasks could hold the tip for cleanup attempt, if cleanup attempt + // got launched before this method. + if (tasks.get(taskId) == this) { + tasks.remove(taskId); + } } synchronized (this){ if (alwaysKeepTaskFiles || @@ -2381,8 +2500,8 @@ } synchronized (this) { try { - String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR - + task.getJobID() + Path.SEPARATOR + taskId; + String taskDir = getLocalTaskDir(task.getJobID().toString(), + taskId.toString(), task.isTaskCleanupTask()); if (needCleanup) { if (runner != null) { //cleans up the output directory of the task (where map outputs @@ -2603,15 +2722,10 @@ } if (tip != null) { if (!commitPending) { - tip.taskFinished(); - // Remove the entry from taskMemoryManagerThread's data structures. - if (isTaskMemoryManagerEnabled()) { - taskMemoryManager.removeTask(taskid); - } - tip.releaseSlot(); + tip.reportTaskFinished(); } } else { - LOG.warn("Unknown child task finshed: "+taskid+". Ignored."); + LOG.warn("Unknown child task finished: "+taskid+". Ignored."); } } @@ -2838,15 +2952,13 @@ // Index file Path indexFileName = lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + - jobId + Path.SEPARATOR + - mapId + "/output" + "/file.out.index", conf); + TaskTracker.getIntermediateOutputDir(jobId, mapId) + + "/file.out.index", conf); // Map-output file Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + - jobId + Path.SEPARATOR + - mapId + "/output" + "/file.out", conf); + TaskTracker.getIntermediateOutputDir(jobId, mapId) + + "/file.out", conf); /** * Read the index file to get the information about where Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=741192&r1=741191&r2=741192&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Feb 5 17:24:11 2009 @@ -250,7 +250,8 @@ TaskStatus.State state = ts.getRunState(); if (ts.getIsMap() && ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED))) { + (state == TaskStatus.State.UNASSIGNED) || + ts.inTaskCleanupPhase())) { mapCount++; } } @@ -267,7 +268,8 @@ TaskStatus.State state = ts.getRunState(); if ((!ts.getIsMap()) && ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED))) { + (state == TaskStatus.State.UNASSIGNED) || + ts.inTaskCleanupPhase())) { reduceCount++; } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=741192&r1=741191&r2=741192&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Feb 5 17:24:11 2009 @@ -52,9 +52,10 @@ * encapsulates the events and whether to reset events index. * Version 13 changed the getTask method signature for HADOOP-249 * Version 14 changed the getTask method signature for HADOOP-4232 + * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759 * */ - public static final long versionID = 14L; + public static final long versionID = 15L; /** * Called when a child task process starts, to get its task. Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=741192&r1=741191&r2=741192&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Thu Feb 5 17:24:11 2009 @@ -174,8 +174,10 @@ @Override public void abortTask(TaskAttemptContext context) { try { - context.progress(); - outputFileSystem.delete(workPath, true); + if (workPath != null) { + context.progress(); + outputFileSystem.delete(workPath, true); + } } catch (IOException ie) { LOG.warn("Error discarding output" + StringUtils.stringifyException(ie)); } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=741192&r1=741191&r2=741192&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Thu Feb 5 17:24:11 2009 @@ -110,7 +110,7 @@ JobConf confForThisTask = new JobConf(conf); confForThisTask.set("mapred.local.dir", localDir);//set the localDir - Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask); + Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false); while (pidFilePath == null) { //wait till the pid file is created try { @@ -119,7 +119,7 @@ LOG.warn("sleep is interrupted:" + ie); break; } - pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask); + pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false); } pid = ProcessTree.getPidFromPidFile(pidFilePath.toString()); Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=741192&r1=741191&r2=741192&view=diff ============================================================================== --- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original) +++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Thu Feb 5 17:24:11 2009 @@ -67,13 +67,19 @@ } } } - TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj) - : null; + TaskInProgress tip = null; + if (job != null && tipidObj != null) { + tip = job.getTaskInProgress(tipidObj); + } + TaskStatus[] ts = null; + if (tip != null) { + ts = tip.getTaskStatuses(); + } boolean isCleanupOrSetup = false; - if (tipidObj != null) { - isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask(); + if ( tip != null) { + isCleanupOrSetup = tip.isJobCleanupTask(); if (!isCleanupOrSetup) { - isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask(); + isCleanupOrSetup = tip.isJobSetupTask(); } } %> @@ -115,14 +121,41 @@ TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName); out.print("<tr><td>" + status.getTaskID() + "</td>"); String taskAttemptTracker = null; + String cleanupTrackerName = null; + TaskTrackerStatus cleanupTracker = null; + String cleanupAttemptTracker = null; + boolean hasCleanupAttempt = false; + if (tip != null && tip.isCleanupAttempt(status.getTaskID())) { + cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID()); + cleanupTracker = tracker.getTaskTracker(cleanupTrackerName); + if (cleanupTracker != null) { + cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":" + + cleanupTracker.getHttpPort(); + } + hasCleanupAttempt = true; + } + out.print("<td>"); + if (hasCleanupAttempt) { + out.print("Task attempt: "); + } if (taskTracker == null) { - out.print("<td>" + taskTrackerName + "</td>"); + out.print(taskTrackerName); } else { taskAttemptTracker = "http://" + taskTracker.getHost() + ":" + taskTracker.getHttpPort(); - out.print("<td><a href=\"" + taskAttemptTracker + "\">" - + tracker.getNode(taskTracker.getHost()) + "</a></td>"); + out.print("<a href=\"" + taskAttemptTracker + "\">" + + tracker.getNode(taskTracker.getHost()) + "</a>"); + } + if (hasCleanupAttempt) { + out.print("<br/>Cleanup Attempt: "); + if (cleanupAttemptTracker == null ) { + out.print(cleanupTrackerName); + } else { + out.print("<a href=\"" + cleanupAttemptTracker + "\">" + + tracker.getNode(cleanupTracker.getHost()) + "</a>"); } + } + out.print("</td>"); out.print("<td>" + status.getRunState() + "</td>"); out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2) + ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>"); @@ -162,6 +195,9 @@ String.valueOf(taskTracker.getHttpPort()), status.getTaskID().toString()); } + if (hasCleanupAttempt) { + out.print("Task attempt: <br/>"); + } if (taskLogUrl == null) { out.print("n/a"); } else { @@ -172,6 +208,25 @@ out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>"); out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>"); } + if (hasCleanupAttempt) { + out.print("Cleanup attempt: <br/>"); + taskLogUrl = null; + if (cleanupTracker != null ) { + taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(), + String.valueOf(cleanupTracker.getHttpPort()), + status.getTaskID().toString()); + } + if (taskLogUrl == null) { + out.print("n/a"); + } else { + String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true"; + String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true"; + String entireLogUrl = taskLogUrl + "&all=true&cleanup=true"; + out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>"); + out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>"); + out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>"); + } + } out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid + "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">" + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");
