Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=741203&r1=741202&r2=741203&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Thu Feb 5 17:39:27 2009 @@ -41,7 +41,7 @@ // what state is the task in? public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, - COMMIT_PENDING} + COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN} private final TaskAttemptID taskid; private float progress; @@ -204,6 +204,12 @@ } this.phase = phase; } + + boolean inTaskCleanupPhase() { + return (this.phase == TaskStatus.Phase.CLEANUP && + (this.runState == TaskStatus.State.FAILED_UNCLEAN || + this.runState == TaskStatus.State.KILLED_UNCLEAN)); + } public boolean getIncludeCounters() { return includeCounters; @@ -261,9 +267,9 @@ /** * Update the status of the task. * + * @param runstate * @param progress * @param state - * @param phase * @param counters */ synchronized void statusUpdate(State runState, @@ -300,7 +306,33 @@ this.counters = status.getCounters(); this.outputSize = status.outputSize; } - + + /** + * Update specific fields of task status + * + * This update is done in JobTracker when a cleanup attempt of task + * reports its status. Then update only specific fields, not all. + * + * @param runState + * @param progress + * @param state + * @param phase + * @param finishTime + */ + synchronized void statusUpdate(State runState, + float progress, + String state, + Phase phase, + long finishTime) { + setRunState(runState); + setProgress(progress); + setStateString(state); + setPhase(phase); + if (finishTime != 0) { + this.finishTime = finishTime; + } + } + /** * Clear out transient information after sending out a status-update * from either the {...@link Task} to the {...@link TaskTracker} or from the
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=741203&r1=741202&r2=741203&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Feb 5 17:39:27 2009 @@ -182,7 +182,8 @@ private static final String SUBDIR = "taskTracker"; private static final String CACHEDIR = "archive"; private static final String JOBCACHE = "jobcache"; - private static final String PIDDIR = "pids"; + private static final String PID = "pid"; + private static final String OUTPUT = "output"; private JobConf originalConf; private JobConf fConf; private int maxCurrentMapTasks; @@ -412,10 +413,36 @@ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE; } - static String getPidFilesSubdir() { - return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR; + static String getLocalJobDir(String jobid) { + return getJobCacheSubdir() + Path.SEPARATOR + jobid; } - + + static String getLocalTaskDir(String jobid, String taskid) { + return getLocalTaskDir(jobid, taskid, false) ; + } + + static String getIntermediateOutputDir(String jobid, String taskid) { + return getLocalTaskDir(jobid, taskid) + + Path.SEPARATOR + TaskTracker.OUTPUT ; + } + + static String getLocalTaskDir(String jobid, + String taskid, + boolean isCleanupAttempt) { + String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid; + if (isCleanupAttempt) { + taskDir = taskDir + ".cleanup"; + } + return taskDir; + } + + static String getPidFile(String jobid, + String taskid, + boolean isCleanup) { + return getLocalTaskDir(jobid, taskid, isCleanup) + + Path.SEPARATOR + PID; + } + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { if (protocol.equals(TaskUmbilicalProtocol.class.getName())) { @@ -756,9 +783,9 @@ } catch(FileNotFoundException fe) { jobFileSize = -1; } - Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir() - + Path.SEPARATOR + jobId - + Path.SEPARATOR + "job.xml"), + Path localJobFile = lDirAlloc.getLocalPathForWrite( + getLocalJobDir(jobId.toString()) + + Path.SEPARATOR + "job.xml", jobFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, tip); synchronized (rjob) { @@ -782,9 +809,9 @@ // create the 'work' directory // job-specific shared directory for use as scratch space - Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir() - + Path.SEPARATOR + jobId - + Path.SEPARATOR + "work"), fConf); + Path workDir = lDirAlloc.getLocalPathForWrite( + (getLocalJobDir(jobId.toString()) + + Path.SEPARATOR + "work"), fConf); if (!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); @@ -806,8 +833,7 @@ // Here we check for and we check five times the size of jarFileSize // to accommodate for unjarring the jar file in work directory localJarFile = new Path(lDirAlloc.getLocalPathForWrite( - getJobCacheSubdir() - + Path.SEPARATOR + jobId + getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "jars", 5 * jarFileSize, fConf), "job.jar"); if (!localFs.mkdirs(localJarFile.getParent())) { @@ -1224,7 +1250,8 @@ for (TaskStatus taskStatus : status.getTaskReports()) { if (taskStatus.getRunState() != TaskStatus.State.RUNNING && taskStatus.getRunState() != TaskStatus.State.UNASSIGNED && - taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) { + taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && + !taskStatus.inTaskCleanupPhase()) { if (taskStatus.getIsMap()) { mapTotal--; } else { @@ -1341,7 +1368,8 @@ long now = System.currentTimeMillis(); for (TaskInProgress tip: runningTasks.values()) { if (tip.getRunState() == TaskStatus.State.RUNNING || - tip.getRunState() == TaskStatus.State.COMMIT_PENDING) { + tip.getRunState() == TaskStatus.State.COMMIT_PENDING || + tip.isCleaningup()) { // Check the per-job timeout interval for tasks; // an interval of '0' implies it is never timed-out long jobTaskTimeout = tip.getTaskTimeout(); @@ -1395,8 +1423,7 @@ // task if the job is done/failed if (!rjob.keepJobFiles){ directoryCleanupThread.addToQueue(getLocalFiles(fConf, - SUBDIR + Path.SEPARATOR + JOBCACHE + - Path.SEPARATOR + rjob.getJobID())); + getLocalJobDir(rjob.getJobID().toString()))); } // Remove this job rjob.tasks.clear(); @@ -1650,7 +1677,9 @@ } synchronized (tip) { //to make sure that there is no kill task action for this - if (tip.getRunState() != TaskStatus.State.UNASSIGNED) { + if (tip.getRunState() != TaskStatus.State.UNASSIGNED && + tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN && + tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) { //got killed externally while still in the launcher queue addFreeSlot(); continue; @@ -1671,7 +1700,8 @@ private TaskInProgress registerTask(LaunchTaskAction action, TaskLauncher launcher) { Task t = action.getTask(); - LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID()); + LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() + + " task's state:" + t.getState()); TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher); synchronized (this) { tasks.put(t.getTaskID(), tip); @@ -1693,10 +1723,6 @@ private void startNewTask(TaskInProgress tip) { try { localizeJob(tip); - if (isTaskMemoryManagerEnabled()) { - taskMemoryManager.addTask(tip.getTask().getTaskID(), - getVirtualMemoryForTask(tip.getJobConf())); - } } catch (Throwable e) { String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils.stringifyException(e)); @@ -1717,7 +1743,23 @@ } } } - + + void addToMemoryManager(TaskAttemptID attemptId, + JobConf conf, + String pidFile) { + if (isTaskMemoryManagerEnabled()) { + taskMemoryManager.addTask(attemptId, + getVirtualMemoryForTask(conf), pidFile); + } + } + + void removeFromMemoryManager(TaskAttemptID attemptId) { + // Remove the entry from taskMemoryManagerThread's data structures. + if (isTaskMemoryManagerEnabled()) { + taskMemoryManager.removeTask(attemptId); + } + } + /** * The server retry loop. * This while-loop attempts to connect to the JobTracker. It only @@ -1804,10 +1846,12 @@ localJobConf = null; taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 0.0f, - TaskStatus.State.UNASSIGNED, + task.getState(), diagnosticInfo.toString(), "initializing", getName(), + task.isTaskCleanupTask() ? + TaskStatus.Phase.CLEANUP : task.isMapTask()? TaskStatus.Phase.MAP: TaskStatus.Phase.SHUFFLE, task.getCounters()); @@ -1817,9 +1861,10 @@ private void localizeTask(Task task) throws IOException{ Path localTaskDir = - lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + - Path.SEPARATOR + task.getJobID() + Path.SEPARATOR + - task.getTaskID()), defaultJobConf ); + lDirAlloc.getLocalPathForWrite( + TaskTracker.getLocalTaskDir(task.getJobID().toString(), + task.getTaskID().toString(), task.isTaskCleanupTask()), + defaultJobConf ); FileSystem localFs = FileSystem.getLocal(fConf); if (!localFs.mkdirs(localTaskDir)) { @@ -1829,8 +1874,7 @@ // create symlink for ../work if it already doesnt exist String workDir = lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() - + Path.SEPARATOR + task.getJobID() + TaskTracker.getLocalJobDir(task.getJobID().toString()) + Path.SEPARATOR + "work", defaultJobConf).toString(); String link = localTaskDir.getParent().toString() @@ -1841,11 +1885,10 @@ // create the working-directory of the task Path cwd = lDirAlloc.getLocalPathForWrite( - TaskTracker.getJobCacheSubdir() - + Path.SEPARATOR + task.getJobID() - + Path.SEPARATOR + task.getTaskID() - + Path.SEPARATOR + MRConstants.WORKDIR, - defaultJobConf); + getLocalTaskDir(task.getJobID().toString(), + task.getTaskID().toString(), task.isTaskCleanupTask()) + + Path.SEPARATOR + MRConstants.WORKDIR, + defaultJobConf); if (!localFs.mkdirs(cwd)) { throw new IOException("Mkdirs failed to create " + cwd.toString()); @@ -1943,9 +1986,13 @@ * Kick off the task execution */ public synchronized void launchTask() throws IOException { - if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { + if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED || + this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || + this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) { localizeTask(task); - this.taskStatus.setRunState(TaskStatus.State.RUNNING); + if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { + this.taskStatus.setRunState(TaskStatus.State.RUNNING); + } this.runner = task.createRunner(TaskTracker.this, this); this.runner.start(); this.taskStatus.setStartTime(System.currentTimeMillis()); @@ -1955,6 +2002,10 @@ } } + boolean isCleaningup() { + return this.taskStatus.inTaskCleanupPhase(); + } + /** * The task is reporting its progress */ @@ -1962,10 +2013,14 @@ { LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% " + taskStatus.getStateString()); - + // task will report its state as + // COMMIT_PENDING when it is waiting for commit response and + // when it is committing. + // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN if (this.done || (this.taskStatus.getRunState() != TaskStatus.State.RUNNING && - this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) { + this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && + !isCleaningup())) { //make sure we ignore progress messages after a task has //invoked TaskUmbilicalProtocol.done() or if the task has been //KILLED/FAILED @@ -2016,7 +2071,16 @@ * The task is reporting that it's done running */ public synchronized void reportDone() { - this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + if (isCleaningup()) { + if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { + this.taskStatus.setRunState(TaskStatus.State.FAILED); + } else if (this.taskStatus.getRunState() == + TaskStatus.State.KILLED_UNCLEAN) { + this.taskStatus.setRunState(TaskStatus.State.KILLED); + } + } else { + this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + } this.taskStatus.setProgress(1.0f); this.taskStatus.setFinishTime(System.currentTimeMillis()); this.done = true; @@ -2031,6 +2095,11 @@ return wasKilled; } + void reportTaskFinished() { + taskFinished(); + releaseSlot(); + } + /** * The task has actually finished running. */ @@ -2057,7 +2126,23 @@ if (!done) { if (!wasKilled) { failures += 1; - taskStatus.setRunState(TaskStatus.State.FAILED); + /* State changes: + * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED + * FAILED_UNCLEAN -> FAILED + * KILLED_UNCLEAN -> KILLED + */ + if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.FAILED); + } else if (taskStatus.getRunState() == + TaskStatus.State.KILLED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.KILLED); + } else if (task.isMapOrReduce() && + taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) { + taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN); + } else { + taskStatus.setRunState(TaskStatus.State.FAILED); + } + removeFromMemoryManager(task.getTaskID()); // call the script here for the failed tasks. if (debugCommand != null) { String taskStdout =""; @@ -2083,9 +2168,10 @@ File workDir = null; try { workDir = new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() - + Path.SEPARATOR + task.getJobID() - + Path.SEPARATOR + task.getTaskID() + TaskTracker.getLocalTaskDir( + task.getJobID().toString(), + task.getTaskID().toString(), + task.isTaskCleanupTask()) + Path.SEPARATOR + MRConstants.WORKDIR, localJobConf). toString()); } catch (IOException e) { @@ -2138,14 +2224,14 @@ LOG.warn("Exception in add diagnostics!"); } } - } else { - taskStatus.setRunState(TaskStatus.State.KILLED); } taskStatus.setProgress(0.0f); } this.taskStatus.setFinishTime(System.currentTimeMillis()); needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || - taskStatus.getRunState() == TaskStatus.State.KILLED); + taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || + taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || + taskStatus.getRunState() == TaskStatus.State.KILLED); } // @@ -2255,7 +2341,8 @@ synchronized(this){ if (getRunState() == TaskStatus.State.RUNNING || getRunState() == TaskStatus.State.UNASSIGNED || - getRunState() == TaskStatus.State.COMMIT_PENDING) { + getRunState() == TaskStatus.State.COMMIT_PENDING || + isCleaningup()) { kill(wasFailure); } } @@ -2269,16 +2356,38 @@ * @param wasFailure was it a failure (versus a kill request)? */ public synchronized void kill(boolean wasFailure) throws IOException { + /* State changes: + * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED + * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN + * FAILED_UNCLEAN -> FAILED + * KILLED_UNCLEAN -> KILLED + * UNASSIGNED -> FAILED/KILLED + */ if (taskStatus.getRunState() == TaskStatus.State.RUNNING || - taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) { + taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || + isCleaningup()) { wasKilled = true; if (wasFailure) { failures += 1; } runner.kill(); - taskStatus.setRunState((wasFailure) ? - TaskStatus.State.FAILED : - TaskStatus.State.KILLED); + if (task.isMapOrReduce()) { + taskStatus.setRunState((wasFailure) ? + TaskStatus.State.FAILED_UNCLEAN : + TaskStatus.State.KILLED_UNCLEAN); + } else { + // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always + if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.FAILED); + } else if (taskStatus.getRunState() == + TaskStatus.State.KILLED_UNCLEAN) { + taskStatus.setRunState(TaskStatus.State.KILLED); + } else { + taskStatus.setRunState((wasFailure) ? + TaskStatus.State.FAILED : + TaskStatus.State.KILLED); + } + } } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { if (wasFailure) { failures += 1; @@ -2287,6 +2396,7 @@ taskStatus.setRunState(TaskStatus.State.KILLED); } } + removeFromMemoryManager(task.getTaskID()); releaseSlot(); } @@ -2338,7 +2448,12 @@ synchronized (TaskTracker.this) { if (needCleanup) { - tasks.remove(taskId); + // see if tasks data structure is holding this tip. + // tasks could hold the tip for cleanup attempt, if cleanup attempt + // got launched before this method. + if (tasks.get(taskId) == this) { + tasks.remove(taskId); + } } synchronized (this){ if (alwaysKeepTaskFiles || @@ -2350,8 +2465,8 @@ } synchronized (this) { try { - String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR - + task.getJobID() + Path.SEPARATOR + taskId; + String taskDir = getLocalTaskDir(task.getJobID().toString(), + taskId.toString(), task.isTaskCleanupTask()); if (needCleanup) { if (runner != null) { //cleans up the output directory of the task (where map outputs @@ -2572,15 +2687,10 @@ } if (tip != null) { if (!commitPending) { - tip.taskFinished(); - // Remove the entry from taskMemoryManagerThread's data structures. - if (isTaskMemoryManagerEnabled()) { - taskMemoryManager.removeTask(taskid); - } - tip.releaseSlot(); + tip.reportTaskFinished(); } } else { - LOG.warn("Unknown child task finshed: "+taskid+". Ignored."); + LOG.warn("Unknown child task finished: "+taskid+". Ignored."); } } @@ -2807,15 +2917,13 @@ // Index file Path indexFileName = lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + - jobId + Path.SEPARATOR + - mapId + "/output" + "/file.out.index", conf); + TaskTracker.getIntermediateOutputDir(jobId, mapId) + + "/file.out.index", conf); // Map-output file Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + - jobId + Path.SEPARATOR + - mapId + "/output" + "/file.out", conf); + TaskTracker.getIntermediateOutputDir(jobId, mapId) + + "/file.out", conf); /** * Read the index file to get the information about where Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=741203&r1=741202&r2=741203&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Feb 5 17:39:27 2009 @@ -250,7 +250,8 @@ TaskStatus.State state = ts.getRunState(); if (ts.getIsMap() && ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED))) { + (state == TaskStatus.State.UNASSIGNED) || + ts.inTaskCleanupPhase())) { mapCount++; } } @@ -267,7 +268,8 @@ TaskStatus.State state = ts.getRunState(); if ((!ts.getIsMap()) && ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED))) { + (state == TaskStatus.State.UNASSIGNED) || + ts.inTaskCleanupPhase())) { reduceCount++; } } Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=741203&r1=741202&r2=741203&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Feb 5 17:39:27 2009 @@ -52,9 +52,10 @@ * encapsulates the events and whether to reset events index. * Version 13 changed the getTask method signature for HADOOP-249 * Version 14 changed the getTask method signature for HADOOP-4232 + * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759 * */ - public static final long versionID = 14L; + public static final long versionID = 15L; /** * Called when a child task process starts, to get its task. Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=741203&r1=741202&r2=741203&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Thu Feb 5 17:39:27 2009 @@ -174,8 +174,10 @@ @Override public void abortTask(TaskAttemptContext context) { try { - context.progress(); - outputFileSystem.delete(workPath, true); + if (workPath != null) { + context.progress(); + outputFileSystem.delete(workPath, true); + } } catch (IOException ie) { LOG.warn("Error discarding output" + StringUtils.stringifyException(ie)); } Added: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=741203&view=auto ============================================================================== --- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java (added) +++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java Thu Feb 5 17:39:27 2009 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.lib.IdentityReducer; + +public class TestTaskFail extends TestCase { + public static class MapperClass extends MapReduceBase + implements Mapper<LongWritable, Text, Text, IntWritable> { + String taskid; + public void configure(JobConf job) { + taskid = job.get("mapred.task.id"); + } + public void map (LongWritable key, Text value, + OutputCollector<Text, IntWritable> output, + Reporter reporter) throws IOException { + if (taskid.endsWith("_0")) { + throw new IOException(); + } else if (taskid.endsWith("_1")) { + System.exit(-1); + } + } + } + + public RunningJob launchJob(JobConf conf, + Path inDir, + Path outDir, + String input) + throws IOException { + // set up the input file system and write input text. + FileSystem inFs = inDir.getFileSystem(conf); + FileSystem outFs = outDir.getFileSystem(conf); + outFs.delete(outDir, true); + if (!inFs.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + { + // write input into input file + DataOutputStream file = inFs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); + } + + // configure the mapred Job + conf.setMapperClass(MapperClass.class); + conf.setReducerClass(IdentityReducer.class); + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", + "/tmp")).toString().replace(' ', '+'); + conf.set("test.build.data", TEST_ROOT_DIR); + // return the RunningJob handle. + return new JobClient(conf).submitJob(conf); + } + + public void testWithDFS() throws IOException { + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + try { + final int taskTrackers = 4; + + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(conf, 4, true, null); + fileSys = dfs.getFileSystem(); + mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1); + JobConf jobConf = mr.createJobConf(); + final Path inDir = new Path("./input"); + final Path outDir = new Path("./output"); + String input = "The quick brown fox\nhas many silly\nred fox sox\n"; + RunningJob job = null; + + job = launchJob(jobConf, inDir, outDir, input); + // wait for the job to finish. + while (!job.isComplete()); + assertEquals(JobStatus.SUCCEEDED, job.getJobState()); + + JobID jobId = job.getID(); + // construct the task id of first map task + TaskAttemptID attemptId = + new TaskAttemptID(new TaskID(jobId, true, 0), 0); + TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker(). + getTip(attemptId.getTaskID()); + // this should not be cleanup attempt since the first attempt + // fails with an exception + assertTrue(!tip.isCleanupAttempt(attemptId)); + TaskStatus ts = + mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); + assertTrue(ts != null); + assertEquals(TaskStatus.State.FAILED, ts.getRunState()); + + attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1); + // this should be cleanup attempt since the second attempt fails + // with System.exit + assertTrue(tip.isCleanupAttempt(attemptId)); + ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId); + assertTrue(ts != null); + assertEquals(TaskStatus.State.FAILED, ts.getRunState()); + + } finally { + if (dfs != null) { dfs.shutdown(); } + if (mr != null) { mr.shutdown(); } + } + } + + public static void main(String[] argv) throws Exception { + TestTaskFail td = new TestTaskFail(); + td.testWithDFS(); + } +} Modified: hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp?rev=741203&r1=741202&r2=741203&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp (original) +++ hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp Thu Feb 5 17:39:27 2009 @@ -67,13 +67,19 @@ } } } - TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj) - : null; + TaskInProgress tip = null; + if (job != null && tipidObj != null) { + tip = job.getTaskInProgress(tipidObj); + } + TaskStatus[] ts = null; + if (tip != null) { + ts = tip.getTaskStatuses(); + } boolean isCleanupOrSetup = false; - if (tipidObj != null) { - isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask(); + if ( tip != null) { + isCleanupOrSetup = tip.isJobCleanupTask(); if (!isCleanupOrSetup) { - isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask(); + isCleanupOrSetup = tip.isJobSetupTask(); } } %> @@ -115,14 +121,41 @@ TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName); out.print("<tr><td>" + status.getTaskID() + "</td>"); String taskAttemptTracker = null; + String cleanupTrackerName = null; + TaskTrackerStatus cleanupTracker = null; + String cleanupAttemptTracker = null; + boolean hasCleanupAttempt = false; + if (tip != null && tip.isCleanupAttempt(status.getTaskID())) { + cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID()); + cleanupTracker = tracker.getTaskTracker(cleanupTrackerName); + if (cleanupTracker != null) { + cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":" + + cleanupTracker.getHttpPort(); + } + hasCleanupAttempt = true; + } + out.print("<td>"); + if (hasCleanupAttempt) { + out.print("Task attempt: "); + } if (taskTracker == null) { - out.print("<td>" + taskTrackerName + "</td>"); + out.print(taskTrackerName); } else { taskAttemptTracker = "http://" + taskTracker.getHost() + ":" + taskTracker.getHttpPort(); - out.print("<td><a href=\"" + taskAttemptTracker + "\">" - + tracker.getNode(taskTracker.getHost()) + "</a></td>"); + out.print("<a href=\"" + taskAttemptTracker + "\">" + + tracker.getNode(taskTracker.getHost()) + "</a>"); + } + if (hasCleanupAttempt) { + out.print("<br/>Cleanup Attempt: "); + if (cleanupAttemptTracker == null ) { + out.print(cleanupTrackerName); + } else { + out.print("<a href=\"" + cleanupAttemptTracker + "\">" + + tracker.getNode(cleanupTracker.getHost()) + "</a>"); } + } + out.print("</td>"); out.print("<td>" + status.getRunState() + "</td>"); out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2) + ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>"); @@ -162,6 +195,9 @@ String.valueOf(taskTracker.getHttpPort()), status.getTaskID().toString()); } + if (hasCleanupAttempt) { + out.print("Task attempt: <br/>"); + } if (taskLogUrl == null) { out.print("n/a"); } else { @@ -172,6 +208,25 @@ out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>"); out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>"); } + if (hasCleanupAttempt) { + out.print("Cleanup attempt: <br/>"); + taskLogUrl = null; + if (cleanupTracker != null ) { + taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(), + String.valueOf(cleanupTracker.getHttpPort()), + status.getTaskID().toString()); + } + if (taskLogUrl == null) { + out.print("n/a"); + } else { + String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true"; + String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true"; + String entireLogUrl = taskLogUrl + "&all=true&cleanup=true"; + out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>"); + out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>"); + out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>"); + } + } out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid + "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">" + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");
