Author: acmurthy Date: Wed Oct 10 02:32:49 2007 New Revision: 583408 URL: http://svn.apache.org/viewvc?rev=583408&view=rev Log: HADOOP-1874. Move task-outputs' promotion/discard to a separate thread distinct from the main heartbeat-processing thread. The main upside being that we do not lock-up the JobTracker during HDFS operations, which otherwise may lead to lost tasktrackers if the NameNode is unresponsive. Contributed by Devaraj Das.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 10 02:32:49 2007 @@ -253,6 +253,12 @@ HADOOP-1992. Fix the performance degradation in the sort validator. (acmurthy via omalley) + HADOOP-1874. Move task-outputs' promotion/discard to a separate thread + distinct from the main heartbeat-processing thread. The main upside being + that we do not lock-up the JobTracker during HDFS operations, which + otherwise may lead to lost tasktrackers if the NameNode is unresponsive. + (Devaraj Das via acmurthy) + IMPROVEMENTS HADOOP-1908. Restructure data node code so that block sending and Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 10 02:32:49 2007 @@ -37,7 +37,6 @@ import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.util.StringUtils; /************************************************************* * JobInProgress maintains all the info for keeping @@ -380,6 +379,16 @@ TaskTrackerStatus ttStatus = this.jobtracker.getTaskTracker(status.getTaskTracker()); String httpTaskLogLocation = null; + + if (state == TaskStatus.State.COMMIT_PENDING || + state == TaskStatus.State.FAILED || + state == TaskStatus.State.KILLED) { + JobWithTaskContext j = new JobWithTaskContext(this, tip, + status.getTaskId(), + metrics); + jobtracker.addToCommitQueue(j); + } + if (null != ttStatus){ httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" + ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" + @@ -388,7 +397,7 @@ TaskCompletionEvent taskEvent = null; if (state == TaskStatus.State.SUCCEEDED) { - boolean complete = false; + completedTask(tip, status, metrics); taskEvent = new TaskCompletionEvent( taskCompletionEventTracker, status.getTaskId(), @@ -397,28 +406,14 @@ TaskCompletionEvent.Status.SUCCEEDED, httpTaskLogLocation ); - try { - complete = completedTask(tip, status, metrics); - } catch (IOException ioe) { - // Oops! Failed to copy the task's output to its final place; - // fail the task! - failedTask(tip, status.getTaskId(), - "Failed to copy reduce's output", - (tip.isMapTask() ? - TaskStatus.Phase.MAP : - TaskStatus.Phase.REDUCE), - TaskStatus.State.FAILED, - status.getTaskTracker(), null); - LOG.info("Failed to copy the output of " + status.getTaskId() + - " with: " + StringUtils.stringifyException(ioe)); - return; - } - - if (complete) { - tip.setSuccessEventNumber(taskCompletionEventTracker); - } else { - taskEvent.setTaskStatus(TaskCompletionEvent.Status.KILLED); - } + tip.setSuccessEventNumber(taskCompletionEventTracker); + } + //For a failed task update the JT datastructures.For the task state where + //only the COMMIT is pending, delegate everything to the JT thread. For + //failed tasks we want the JT to schedule a reexecution ASAP (and not go + //via the queue for the datastructures' updates). + else if (state == TaskStatus.State.COMMIT_PENDING) { + return; } else if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) { // Get the event number for the (possibly) previously successful @@ -771,7 +766,7 @@ public synchronized boolean completedTask(TaskInProgress tip, TaskStatus status, JobTrackerMetrics metrics) - throws IOException { + { String taskid = status.getTaskId(); // Sanity check: is the TIP already complete? @@ -928,14 +923,10 @@ TaskStatus status, String trackerName, boolean wasRunning, boolean wasComplete, JobTrackerMetrics metrics) { - if(status.getRunState() == TaskStatus.State.KILLED ) { - tip.taskKilled(taskid, trackerName, this.status); - } - else { - // Mark the taskid as a 'failure' - tip.incompleteSubTask(taskid, trackerName, this.status); - } - + + // Mark the taskid as FAILED or KILLED + tip.incompleteSubTask(taskid, trackerName, this.status); + boolean isRunning = tip.isRunning(); boolean isComplete = tip.isComplete(); @@ -1065,7 +1056,7 @@ reason, reason, trackerName, phase, - tip.getCounters()); + null); updateTaskStatus(tip, status, metrics); JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), @@ -1176,6 +1167,32 @@ TaskStatus.State.FAILED, trackerName, metrics); mapTaskIdToFetchFailuresMap.remove(mapTaskId); + } + } + + static class JobWithTaskContext { + private JobInProgress job; + private TaskInProgress tip; + private String taskId; + private JobTrackerMetrics metrics; + JobWithTaskContext(JobInProgress job, TaskInProgress tip, + String taskId, JobTrackerMetrics metrics) { + this.job = job; + this.tip = tip; + this.taskId = taskId; + this.metrics = metrics; + } + JobInProgress getJob() { + return job; + } + TaskInProgress getTIP() { + return tip; + } + String getTaskId() { + return taskId; + } + JobTrackerMetrics getJobTrackerMetrics() { + return metrics; } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Oct 10 02:32:49 2007 @@ -39,6 +39,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.Vector; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -609,6 +610,8 @@ Path systemDir = null; private JobConf conf; + private Thread taskCommitThread; + /** * Start the JobTracker process, listen on the indicated port */ @@ -663,15 +666,6 @@ myMetrics = new JobTrackerMetrics(this, jobConf); - this.expireTrackersThread = new Thread(this.expireTrackers, - "expireTrackers"); - this.expireTrackersThread.start(); - this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); - this.retireJobsThread.start(); - this.initJobsThread = new Thread(this.initJobs, "initJobs"); - this.initJobsThread.start(); - expireLaunchingTaskThread.start(); - // The rpc/web-server ports can be ephemeral ports... // ... ensure we have the correct info this.port = interTrackerServer.getListenerAddress().getPort(); @@ -726,6 +720,17 @@ * Run forever */ public void offerService() throws InterruptedException { + this.expireTrackersThread = new Thread(this.expireTrackers, + "expireTrackers"); + this.expireTrackersThread.start(); + this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); + this.retireJobsThread.start(); + this.initJobsThread = new Thread(this.initJobs, "initJobs"); + this.initJobsThread.start(); + expireLaunchingTaskThread.start(); + this.taskCommitThread = new TaskCommitQueue(); + this.taskCommitThread.start(); + this.interTrackerServer.join(); LOG.info("Stopped interTrackerServer"); } @@ -781,6 +786,16 @@ ex.printStackTrace(); } } + if (this.taskCommitThread != null) { + LOG.info("Stopping TaskCommit thread"); + this.taskCommitThread.interrupt(); + try { + this.taskCommitThread.interrupt(); + this.taskCommitThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } LOG.info("stopped all jobtracker services"); return; } @@ -853,7 +868,8 @@ void markCompletedJob(JobInProgress job) { for (TaskInProgress tip : job.getMapTasks()) { for (TaskStatus taskStatus : tip.getTaskStatuses()) { - if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { + if (taskStatus.getRunState() != TaskStatus.State.RUNNING && + taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) { markCompletedTaskAttempt(taskStatus.getTaskTracker(), taskStatus.getTaskId()); } @@ -861,7 +877,8 @@ } for (TaskInProgress tip : job.getReduceTasks()) { for (TaskStatus taskStatus : tip.getTaskStatuses()) { - if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { + if (taskStatus.getRunState() != TaskStatus.State.RUNNING && + taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) { markCompletedTaskAttempt(taskStatus.getTaskTracker(), taskStatus.getTaskId()); } @@ -1836,6 +1853,146 @@ removeMarkedTasks(trackerName); } } + + public void addToCommitQueue(JobInProgress.JobWithTaskContext j) { + ((TaskCommitQueue)taskCommitThread).addToQueue(j); + } + //This thread takes care of things like moving outputs to their final + //locations & deleting temporary outputs + private class TaskCommitQueue extends Thread { + + private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue = + new LinkedBlockingQueue <JobInProgress.JobWithTaskContext>(); + + public TaskCommitQueue() { + setName("Task Commit Thread"); + setDaemon(true); + } + + public void addToQueue(JobInProgress.JobWithTaskContext j) { + while (!queue.add(j)) { + LOG.warn("Couldn't add to the Task Commit queue now. Will " + + "try again"); + try { + Thread.sleep(2000); + } catch (InterruptedException ie) {} + } + } + + public void run() { + while (!isInterrupted()) { + JobInProgress.JobWithTaskContext j; + try { + j = queue.take(); + } catch (InterruptedException ie) { + return; + } + JobInProgress job = j.getJob(); + TaskInProgress tip = j.getTIP(); + String taskid = j.getTaskId(); + JobTrackerMetrics metrics = j.getJobTrackerMetrics(); + Task t; + TaskStatus status; + boolean isTipComplete = false; + TaskStatus.State state; + synchronized (JobTracker.this) { + synchronized (job) { + synchronized (tip) { + status = tip.getTaskStatus(taskid); + t = tip.getTaskObject(taskid); + state = status.getRunState(); + isTipComplete = tip.isComplete(); + } + } + } + try { + //For COMMIT_PENDING tasks, we save the task output in the dfs + //as well as manipulate the JT datastructures to reflect a + //successful task. This guarantees that we don't declare a task + //as having succeeded until we have successfully completed the + //dfs operations. + //For failed tasks, we just do the dfs operations here. The + //datastructures updates is done earlier as soon as the failure + //is detected so that the JT can immediately schedule another + //attempt for that task. + if (state == TaskStatus.State.COMMIT_PENDING) { + if (!isTipComplete) { + t.saveTaskOutput(); + } + synchronized (JobTracker.this) { + //do a check for the case where after the task went to + //COMMIT_PENDING, it was lost. So although we would have + //saved the task output, we cannot declare it a SUCCESS. + TaskStatus newStatus = null; + synchronized (job) { + synchronized (tip) { + status = tip.getTaskStatus(taskid); + if (!isTipComplete) { + if (status.getRunState() != + TaskStatus.State.COMMIT_PENDING) { + state = TaskStatus.State.KILLED; + } else { + state = TaskStatus.State.SUCCEEDED; + } + } else { + tip.addDiagnosticInfo(t.getTaskId(),"Already completed " + + "TIP"); + state = TaskStatus.State.KILLED; + + } + //create new status if required. If the state changed from + //COMMIT_PENDING to KILLED in the JobTracker, while we were + //saving the output,the JT would have called updateTaskStatus + //and we don't need to call it again + if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){ + newStatus = TaskStatus.createTaskStatus( + tip.isMapTask(), + taskid, + state == TaskStatus.State.SUCCEEDED ? 1.0f : 0.0f, + state, + status.getDiagnosticInfo(), + status.getStateString(), + status.getTaskTracker(), status.getPhase(), + status.getCounters()); + } + } + if (newStatus != null) { + job.updateTaskStatus(tip, newStatus, metrics); + } + } + } + } + } catch (IOException ioe) { + // Oops! Failed to copy the task's output to its final place; + // fail the task! + state = TaskStatus.State.FAILED; + synchronized (JobTracker.this) { + job.failedTask(tip, status.getTaskId(), + "Failed to rename output with the exception: " + + StringUtils.stringifyException(ioe), + (tip.isMapTask() ? + TaskStatus.Phase.MAP : + TaskStatus.Phase.REDUCE), + TaskStatus.State.FAILED, + status.getTaskTracker(), null); + } + LOG.info("Failed to rename the output of " + status.getTaskId() + + " with: " + StringUtils.stringifyException(ioe)); + } + if (state == TaskStatus.State.FAILED || + state == TaskStatus.State.KILLED) { + try { + t.discardTaskOutput(); + } catch (IOException ioe) { + LOG.info("Failed to discard the output of task " + + status.getTaskId() + " with: " + + StringUtils.stringifyException(ioe)); + } + } + } + } + } + /** * Get the localized job file path on the job trackers local file system Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Oct 10 02:32:49 2007 @@ -438,21 +438,32 @@ private Path getFinalPath(Path jobOutputDir, Path taskOutput) { URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri()); - return new Path(jobOutputDir, relativePath.getPath()); + if (relativePath.getPath().length() > 0) { + return new Path(jobOutputDir, relativePath.getPath()); + } else { + return jobOutputDir; + } } private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput) throws IOException { if (fs.isFile(taskOutput)) { Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput); - fs.mkdirs(finalOutputPath.getParent()); if (!fs.rename(taskOutput, finalOutputPath)) { - throw new IOException("Failed to save output of task: " + - getTaskId()); + if (!fs.delete(finalOutputPath)) { + throw new IOException("Failed to delete earlier output of task: " + + getTaskId()); + } + if (!fs.rename(taskOutput, finalOutputPath)) { + throw new IOException("Failed to save output of task: " + + getTaskId()); + } } LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); } else if(fs.isDirectory(taskOutput)) { Path[] paths = fs.listPaths(taskOutput); + Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput); + fs.mkdirs(finalOutputPath); if (paths != null) { for (Path path : paths) { moveTaskOutputs(fs, jobOutputDir, path); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Oct 10 02:32:49 2007 @@ -104,7 +104,6 @@ // Map from taskId -> Task private Map<String, Task> tasks = new TreeMap<String, Task>(); - boolean savedTaskOutput = false; private TreeSet<String> machinesWhereFailed = new TreeSet<String>(); private TreeSet<String> tasksReportedClosed = new TreeSet<String>(); @@ -185,6 +184,15 @@ return partition; } + public boolean isOnlyCommitPending() { + for (TaskStatus t : taskStatuses.values()) { + if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) { + return true; + } + } + return false; + } + /** * Initialization common to Map and Reduce */ @@ -219,6 +227,15 @@ } /** + * Return the Task object associated with a taskId + * @param taskId + * @return + */ + public Task getTaskObject(String taskId) { + return tasks.get(taskId); + } + + /** * Is this tip currently running any tasks? * @return true if any tasks are running */ @@ -231,7 +248,7 @@ * * @return <code>true</code> if the tip is complete, else <code>false</code> */ - public boolean isComplete() { + public synchronized boolean isComplete() { return (completes > 0); } @@ -350,7 +367,7 @@ * @param taskId id of the task * @param diagInfo diagnostic information for the task */ - private void addDiagnosticInfo(String taskId, String diagInfo) { + public void addDiagnosticInfo(String taskId, String diagInfo) { List<String> diagHistory = taskDiagnosticData.get(taskId); if (diagHistory == null) { diagHistory = new ArrayList<String>(); @@ -396,7 +413,8 @@ if (newState == TaskStatus.State.RUNNING && (oldState == TaskStatus.State.FAILED || oldState == TaskStatus.State.KILLED || - oldState == TaskStatus.State.SUCCEEDED)) { + oldState == TaskStatus.State.SUCCEEDED || + oldState == TaskStatus.State.COMMIT_PENDING)) { return false; } @@ -419,10 +437,18 @@ // // Note the failure and its location // - LOG.info("Task '" + taskid + "' has been lost."); TaskStatus status = taskStatuses.get(taskid); TaskStatus.State taskState = TaskStatus.State.FAILED; if (status != null) { + // Check if the user manually KILLED/FAILED this task-attempt... + Boolean shouldFail = tasksToKill.remove(taskid); + if (shouldFail != null) { + taskState = (shouldFail) ? TaskStatus.State.FAILED : + TaskStatus.State.KILLED; + status.setRunState(taskState); + addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" ); + } + taskState = status.getRunState(); if (taskState != TaskStatus.State.FAILED && taskState != TaskStatus.State.KILLED) { @@ -441,24 +467,18 @@ this.activeTasks.remove(taskid); // Since we do not fail completed reduces (whose outputs go to hdfs), we - // should note this failure only for completed maps; however if the job - // is done, there is no need to manipulate completed maps - if (this.completes > 0 && this.isMapTask() && + // should note this failure only for completed maps, only if this taskid; + // completed this map. however if the job is done, there is no need to + // manipulate completed maps + if (this.isMapTask() && isComplete(taskid) && jobStatus.getRunState() != JobStatus.SUCCEEDED) { this.completes--; } - // Discard task output - Task t = tasks.get(taskid); - try { - t.discardTaskOutput(); - } catch (IOException ioe) { - LOG.info("Failed to discard output of task '" + taskid + "' with " + - StringUtils.stringifyException(ioe)); - } if (taskState == TaskStatus.State.FAILED) { numTaskFailures++; + machinesWhereFailed.add(trackerName); } else { numKilledTasks++; } @@ -467,7 +487,6 @@ LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times."); kill(); } - machinesWhereFailed.add(trackerName); } /** @@ -490,14 +509,6 @@ * taskid as [EMAIL PROTECTED] TaskStatus.State.KILLED}. */ void alreadyCompletedTask(String taskid) { - Task t = tasks.get(taskid); - try { - t.discardTaskOutput(); - } catch (IOException ioe) { - LOG.info("Failed to discard output of task '" + taskid + "' with " + - StringUtils.stringifyException(ioe)); - } - // 'KILL' the task completedTask(taskid, TaskStatus.State.KILLED); @@ -512,29 +523,11 @@ * Indicate that one of the taskids in this TaskInProgress * has successfully completed! */ - public void completed(String taskid) throws IOException { - // - // Finalize the task's output - // - Task t = tasks.get(taskid); - if (!savedTaskOutput) { - t.saveTaskOutput(); - savedTaskOutput = true; - } else { - try { - t.discardTaskOutput(); - } catch (IOException ioe) { - LOG.info("Failed to discard 'already-saved' output of task: " + - t.getTaskId() + " with: " + - StringUtils.stringifyException(ioe)); - } - } - + public void completed(String taskid) { // // Record that this taskid is complete // completedTask(taskid, TaskStatus.State.SUCCEEDED); - // // Now that the TIP is complete, the other speculative @@ -545,7 +538,6 @@ this.completes++; recomputeProgress(); - LOG.info("Task '" + taskid + "' has completed succesfully"); } /** @@ -588,7 +580,8 @@ */ boolean killTask(String taskId, boolean shouldFail) { TaskStatus st = taskStatuses.get(taskId); - if(st != null && st.getRunState() == TaskStatus.State.RUNNING + if(st != null && (st.getRunState() == TaskStatus.State.RUNNING + || st.getRunState() == TaskStatus.State.COMMIT_PENDING) && tasksToKill.put(taskId, shouldFail) == null ) { String logStr = "Request received to " + (shouldFail ? "fail" : "kill") + " task '" + taskId + "' by user"; @@ -599,32 +592,6 @@ return false; } - /** Notification that a task with the given id has been killed */ - void taskKilled(String taskId, String trackerName, JobStatus jobStatus) { - Boolean shouldFail = tasksToKill.remove(taskId); - if(shouldFail != null && !shouldFail) { - LOG.info("Task '" + taskId + "' has been killed"); - this.activeTasks.remove(taskId); - taskStatuses.get(taskId).setRunState(TaskStatus.State.KILLED ); - addDiagnosticInfo(taskId, "Task has been killed" ); - // Discard task output - Task t = tasks.get(taskId); - try { - t.discardTaskOutput(); - } catch (IOException ioe) { - LOG.info("Failed to discard output of task '" + taskId + "' with " + - StringUtils.stringifyException(ioe)); - } - numKilledTasks++; - - } - else { - //set the task status as failed. - taskStatuses.get(taskId).setRunState(TaskStatus.State.FAILED); - incompleteSubTask(taskId, trackerName, jobStatus); - } - } - /** * This method is called whenever there's a status change * for one of the TIP's sub-tasks. It recomputes the overall @@ -650,6 +617,12 @@ bestState = status.getStateString(); bestCounters = status.getCounters(); break; + } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) { + //for COMMIT_PENDING, we take the last state that we recorded + //when the task was RUNNING + bestProgress = this.progress; + bestState = this.state; + bestCounters = this.counters; } else if (status.getRunState() == TaskStatus.State.RUNNING) { if (status.getProgress() >= bestProgress) { bestProgress = status.getProgress(); @@ -692,7 +665,7 @@ runSpeculative && (averageProgress - progress >= SPECULATIVE_GAP) && (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) - && completes == 0) { + && completes == 0 && !isOnlyCommitPending()) { return true; } return false; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Wed Oct 10 02:32:49 2007 @@ -40,7 +40,8 @@ public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE} // what state is the task in? - public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED} + public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, + COMMIT_PENDING} private String taskid; private float progress; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Oct 10 02:32:49 2007 @@ -1368,7 +1368,7 @@ * The task is reporting that it's done running */ public synchronized void reportDone() { - this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + this.taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING); this.taskStatus.setProgress(1.0f); this.taskStatus.setFinishTime(System.currentTimeMillis()); this.done = true; @@ -1400,7 +1400,7 @@ boolean needCleanup = false; synchronized (this) { if (done) { - taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING); } else { if (!wasKilled) { failures += 1; @@ -1477,7 +1477,10 @@ */ private synchronized void mapOutputLost(String failure ) throws IOException { - if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) { + //The check for COMMIT_PENDING should actually be a check for SUCCESS + //however for that, we have to introduce another Action type from the + //JT to the TT (SuccessTaskAction in the lines of KillTaskAction). + if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) { // change status to failure LOG.info("Reporting output lost:"+task.getTaskId()); taskStatus.setRunState(TaskStatus.State.FAILED); Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?rev=583408&r1=583407&r2=583408&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Wed Oct 10 02:32:49 2007 @@ -155,7 +155,7 @@ } out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid + "&tipid=" + tipid + "&taskid=" + status.getTaskId() + "\">" - + status.getCounters().size() + "</a></td>"); + + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>"); out.print("<td>"); if (privateActions && status.getRunState() == TaskStatus.State.RUNNING) {