Author: cutting Date: Thu Aug 23 09:36:00 2007 New Revision: 569063 URL: http://svn.apache.org/viewvc?rev=569063&view=rev Log: HADOOP-1158. Change JobTracker to record map-output transmission errors and use them to trigger speculative re-execution of tasks. Contributed by Arun.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Aug 23 09:36:00 2007 @@ -98,6 +98,10 @@ HADOOP-1654. Add IOUtils class, containing generic io-related utility methods. (Enis Soztutar via cutting) + HADOOP-1158. Change JobTracker to record map-output transmission + errors and use them to trigger speculative re-execution of tasks. + (Arun C Murthy via cutting) + Release 0.14.0 - 2007-08-17 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Thu Aug 23 09:36:00 2007 @@ -51,6 +51,10 @@ LOG.info("Task " + taskId + " reporting file system error: " + message); } + public void shuffleError(String taskId, String message) throws IOException { + LOG.info("Task " + taskId + " reporting shuffle error: " + message); + } + public Task getTask(String taskid) throws IOException { return null; } @@ -59,14 +63,13 @@ return true; } - public boolean progress(String taskid, float progress, String state, - TaskStatus.Phase phase, Counters counters) - throws IOException - { + public boolean statusUpdate(String taskId, TaskStatus taskStatus) + throws IOException, InterruptedException { StringBuffer buf = new StringBuffer("Task "); - buf.append(taskid); + buf.append(taskId); buf.append(" making progress to "); - buf.append(progress); + buf.append(taskStatus.getProgress()); + String state = taskStatus.getStateString(); if (state != null) { buf.append(" and state of "); buf.append(state); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Aug 23 09:36:00 2007 @@ -110,6 +110,14 @@ private MetricsRecord jobMetrics; + // Maximum no. of fetch-failure notifications after which + // the map task is killed + private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; + + // Map of mapTaskId -> no. of fetch failures + private Map<String, Integer> mapTaskIdToFetchFailuresMap = + new TreeMap<String, Integer>(); + /** * Create a JobInProgress with the given job file, plus a handle * to the tracker. @@ -1034,14 +1042,14 @@ TaskStatus.Phase phase, TaskStatus.State state, String hostname, String trackerName, JobTrackerMetrics metrics) { - TaskStatus status = new TaskStatus(taskid, - tip.isMapTask(), - 0.0f, - state, - reason, - reason, - trackerName, phase, - tip.getCounters()); + TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), + taskid, + 0.0f, + state, + reason, + reason, + trackerName, phase, + tip.getCounters()); updateTaskStatus(tip, status, metrics); JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), @@ -1131,5 +1139,28 @@ fromEventId, actualMax + fromEventId).toArray(events); } return events; + } + + synchronized void fetchFailureNotification(TaskInProgress tip, + String mapTaskId, + String hostname, String trackerName, + JobTrackerMetrics metrics) { + Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId); + fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); + mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures); + LOG.info("Failed fetch notification #" + fetchFailures + " for task " + + mapTaskId); + + if (fetchFailures == MAX_FETCH_FAILURES_NOTIFICATIONS) { + LOG.info("Too many fetch-failures for output of task: " + mapTaskId + + " ... killing it"); + + failedTask(tip, mapTaskId, "Too many fetch-failures", + (tip.isMapTask() ? TaskStatus.Phase.MAP : + TaskStatus.Phase.REDUCE), + TaskStatus.State.FAILED, hostname, trackerName, metrics); + + mapTaskIdToFetchFailuresMap.remove(mapTaskId); + } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Aug 23 09:36:00 2007 @@ -1708,8 +1708,9 @@ * jobs that might be affected. */ void updateTaskStatuses(TaskTrackerStatus status) { + String trackerName = status.getTrackerName(); for (TaskStatus report : status.getTaskReports()) { - report.setTaskTracker(status.getTrackerName()); + report.setTaskTracker(trackerName); String taskId = report.getTaskId(); TaskInProgress tip = taskidToTIPMap.get(taskId); if (tip == null) { @@ -1717,6 +1718,21 @@ } else { expireLaunchingTasks.removeTask(taskId); tip.getJob().updateTaskStatus(tip, report, myMetrics); + } + + // Process 'failed fetch' notifications + List<String> failedFetchMaps = report.getFetchFailedMaps(); + if (failedFetchMaps != null) { + for (String mapTaskId : failedFetchMaps) { + TaskInProgress failedFetchMap = taskidToTIPMap.get(mapTaskId); + if (failedFetchMap != null) { + failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, + mapTaskId, + status.getHost(), + trackerName, + myMetrics); + } + } } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Aug 23 09:36:00 2007 @@ -203,23 +203,23 @@ public Task getTask(String taskid) { return null; } - public boolean progress(String taskId, float progress, String state, - TaskStatus.Phase phase, Counters taskCounters) { - LOG.info(state); + public boolean statusUpdate(String taskId, TaskStatus taskStatus) + throws IOException, InterruptedException { + LOG.info(taskStatus.getStateString()); float taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping float numTasks = mapIds.size(); - status.setMapProgress(taskIndex/numTasks + progress/numTasks); + status.setMapProgress(taskIndex/numTasks + taskStatus.getProgress()/numTasks); } else { - status.setReduceProgress(progress); + status.setReduceProgress(taskStatus.getProgress()); } - currentCounters = Counters.sum(completedTaskCounters, taskCounters); + currentCounters = Counters.sum(completedTaskCounters, taskStatus.getCounters()); // ignore phase return true; } - + /** * Updates counters corresponding to completed tasks. * @param task A map or reduce task which has just been @@ -251,6 +251,10 @@ LOG.fatal("FSError: "+ message + "from task: " + taskId); } + public void shuffleError(String taskId, String message) throws IOException { + LOG.fatal("shuffleError: "+ message + "from task: " + taskId); + } + public TaskCompletionEvent[] getMapCompletionEvents( String jobId, int fromEventId, int maxLocs) throws IOException { return TaskCompletionEvent.EMPTY_ARRAY; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Aug 23 09:36:00 2007 @@ -67,7 +67,9 @@ setPhase(TaskStatus.Phase.MAP); } - public MapTask() {} + public MapTask() { + super(); + } public MapTask(String jobId, String jobFile, String tipId, String taskId, int partition, String splitClass, BytesWritable split Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java?rev=569063&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java Thu Aug 23 09:36:00 2007 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +public class MapTaskStatus extends TaskStatus { + + public MapTaskStatus() {} + + public MapTaskStatus(String taskid, float progress, + State runState, String diagnosticInfo, String stateString, + String taskTracker, Phase phase, Counters counters) { + super(taskid, progress, runState, diagnosticInfo, stateString, + taskTracker, phase, counters); + } + + public boolean getIsMap() { + return true; + } + + public long getShuffleFinishTime() { + throw new UnsupportedOperationException("getShuffleFinishTime() not supported for MapTask"); + } + + void setShuffleFinishTime(long shuffleFinishTime) { + throw new UnsupportedOperationException("setShuffleFinishTime() not supported for MapTask"); + } + + public long getSortFinishTime() { + throw new UnsupportedOperationException("getSortFinishTime() not supported for MapTask"); + } + + void setSortFinishTime(long sortFinishTime) { + throw new UnsupportedOperationException("setSortFinishTime() not supported for MapTask"); + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Aug 23 09:36:00 2007 @@ -28,6 +28,7 @@ import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Hashtable; import java.util.Iterator; @@ -36,7 +37,6 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -90,7 +90,9 @@ private Progress sortPhase = getProgress().addPhase("sort"); private Progress reducePhase = getProgress().addPhase("reduce"); - public ReduceTask() {} + public ReduceTask() { + super(); + } public ReduceTask(String jobId, String jobFile, String tipId, String taskId, int partition, int numMaps) { @@ -467,6 +469,30 @@ private long ramfsMergeOutputSize; /** + * Maximum no. of fetch-retries per-map. + */ + private static final int MAX_FETCH_RETRIES_PER_MAP = 5; + + /** + * Maximum no. of unique maps from which we failed to fetch map-outputs + * even after [EMAIL PROTECTED] #MAX_FETCH_RETRIES_PER_MAP} retries; after this the + * reduce task is failed. + */ + private static final int MAX_FAILED_UNIQUE_FETCHES = 5; + + /** + * The maps from which we fail to fetch map-outputs + * even after [EMAIL PROTECTED] #MAX_FETCH_RETRIES_PER_MAP} retries. + */ + Set<Integer> fetchFailedMaps = new TreeSet<Integer>(); + + /** + * A map of taskId -> no. of failed fetches + */ + Map<String, Integer> mapTaskToFailedFetchesMap = + new HashMap<String, Integer>(); + + /** * This class contains the methods that should be used for metrics-reporting * the specific metrics for shuffle. This class actually reports the * metrics for the shuffle client (the ReduceTask), and hence the name @@ -958,7 +984,11 @@ copyPhase.startNextPhase(); copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " + - mbpsFormat.format(transferRate) + " MB/s)"); + mbpsFormat.format(transferRate) + " MB/s)"); + + // Note successfull fetch for this mapId to invalidate + // (possibly) old fetch-failures + fetchFailedMaps.remove(cr.getLocation().getMapId()); } else if (cr.isObsolete()) { //ignore LOG.info(reduceTask.getTaskId() + @@ -968,10 +998,46 @@ } else { retryFetches.add(cr.getLocation()); + // note the failed-fetch + String mapTaskId = cr.getLocation().getMapTaskId(); + Integer mapId = cr.getLocation().getMapId(); + + Integer noFailedFetches = + mapTaskToFailedFetchesMap.get(mapTaskId); + noFailedFetches = + (noFailedFetches == null) ? 1 : (noFailedFetches + 1); + mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches); + LOG.info("Task " + getTaskId() + ": Failed fetch #" + + noFailedFetches + " from " + mapTaskId); + + // did the fetch fail too many times? + if ((noFailedFetches % MAX_FETCH_RETRIES_PER_MAP) == 0) { + synchronized (ReduceTask.this) { + taskStatus.addFetchFailedMap(mapTaskId); + LOG.info("Failed to fetch map-output from " + mapTaskId + + " even after MAX_FETCH_RETRIES_PER_MAP retries... " + + " reporting to the JobTracker"); + } + } + + // note unique failed-fetch maps + if (noFailedFetches == MAX_FETCH_RETRIES_PER_MAP) { + fetchFailedMaps.add(mapId); + + // did we have too many unique failed-fetch maps? + if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) { + LOG.fatal("Shuffle failed with too many fetch failures! " + + "Killing task " + getTaskId() + "."); + umbilical.shuffleError(getTaskId(), + "Exceeded MAX_FAILED_UNIQUE_FETCHES;" + + " bailing-out."); + } + } + // wait a random amount of time for next contact currentTime = System.currentTimeMillis(); - long nextContact = currentTime + 60 * 1000 + - backoff.nextInt(maxBackoff*1000); + long nextContact = currentTime + 60 * 1000 + + backoff.nextInt(maxBackoff*1000); penaltyBox.put(cr.getHost(), nextContact); LOG.warn(reduceTask.getTaskId() + " adding host " + cr.getHost() + " to penalty box, next contact in " + Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=569063&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Thu Aug 23 09:36:00 2007 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Text; + + +public class ReduceTaskStatus extends TaskStatus { + + private long shuffleFinishTime; + private long sortFinishTime; + private List<String> failedFetchTasks = new ArrayList<String>(1); + + public ReduceTaskStatus() {} + + public ReduceTaskStatus(String taskid, float progress, State runState, + String diagnosticInfo, String stateString, String taskTracker, + Phase phase, Counters counters) { + super(taskid, progress, runState, diagnosticInfo, stateString, taskTracker, + phase, counters); + } + + public Object clone() { + ReduceTaskStatus myClone = (ReduceTaskStatus)super.clone(); + myClone.failedFetchTasks = new ArrayList<String>(failedFetchTasks); + return myClone; + } + + public boolean getIsMap() { + return false; + } + + void setFinishTime(long finishTime) { + if (shuffleFinishTime == 0) { + this.shuffleFinishTime = finishTime; + } + if (sortFinishTime == 0){ + this.sortFinishTime = finishTime; + } + super.setFinishTime(finishTime); + } + + public long getShuffleFinishTime() { + return shuffleFinishTime; + } + + void setShuffleFinishTime(long shuffleFinishTime) { + this.shuffleFinishTime = shuffleFinishTime; + } + + public long getSortFinishTime() { + return sortFinishTime; + } + + void setSortFinishTime(long sortFinishTime) { + this.sortFinishTime = sortFinishTime; + if (0 == this.shuffleFinishTime){ + this.shuffleFinishTime = sortFinishTime; + } + } + + public List<String> getFetchFailedMaps() { + return failedFetchTasks; + } + + void addFetchFailedMap(String mapTaskId) { + failedFetchTasks.add(mapTaskId); + } + + synchronized void statusUpdate(TaskStatus status) { + super.statusUpdate(status); + + if (status.getShuffleFinishTime() != 0) { + this.shuffleFinishTime = status.getShuffleFinishTime(); + } + + if (status.getSortFinishTime() != 0) { + sortFinishTime = status.getSortFinishTime(); + } + + List<String> newFetchFailedMaps = status.getFetchFailedMaps(); + if (failedFetchTasks == null) { + failedFetchTasks = newFetchFailedMaps; + } else if (newFetchFailedMaps != null){ + failedFetchTasks.addAll(newFetchFailedMaps); + } + } + + synchronized void clearStatus() { + super.clearStatus(); + failedFetchTasks.clear(); + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + shuffleFinishTime = in.readLong(); + sortFinishTime = in.readLong(); + int noFailedFetchTasks = in.readInt(); + failedFetchTasks = new ArrayList<String>(noFailedFetchTasks); + for (int i=0; i < noFailedFetchTasks; ++i) { + failedFetchTasks.add(Text.readString(in)); + } + } + + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeLong(shuffleFinishTime); + out.writeLong(sortFinishTime); + out.writeInt(failedFetchTasks.size()); + for (String taskId : failedFetchTasks) { + Text.writeString(out, taskId); + } + } + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Aug 23 09:36:00 2007 @@ -83,7 +83,7 @@ private String jobId; // unique jobid private String tipId; private int partition; // id within job - private TaskStatus.Phase phase ; // current phase of the task + TaskStatus taskStatus; // current status of the task private Path taskOutputPath; // task-specific output dir protected JobConf conf; @@ -94,7 +94,9 @@ // Constructors //////////////////////////////////////////// - public Task() {} + public Task() { + taskStatus = TaskStatus.createTaskStatus(isMapTask()); + } public Task(String jobId, String jobFile, String tipId, String taskId, int partition) { @@ -103,6 +105,14 @@ this.jobId = jobId; this.tipId = tipId; this.partition = partition; + this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, + 0.0f, + TaskStatus.State.UNASSIGNED, + "", "", "", + isMapTask() ? + TaskStatus.Phase.MAP : + TaskStatus.Phase.SHUFFLE, + counters); } //////////////////////////////////////////// @@ -135,14 +145,14 @@ * @return */ public synchronized TaskStatus.Phase getPhase(){ - return this.phase; + return this.taskStatus.getPhase(); } /** * Set current phase of the task. * @param p */ - protected synchronized void setPhase(TaskStatus.Phase p){ - this.phase = p; + protected synchronized void setPhase(TaskStatus.Phase phase){ + this.taskStatus.setPhase(phase); } //////////////////////////////////////////// @@ -160,6 +170,7 @@ } else { Text.writeString(out, ""); } + taskStatus.write(out); } public void readFields(DataInput in) throws IOException { jobFile = UTF8.readString(in); @@ -173,6 +184,7 @@ } else { taskOutputPath = null; } + taskStatus.readFields(in); } public String toString() { return taskId; } @@ -276,8 +288,10 @@ if (sendProgress) { // we need to send progress update - taskFound = umbilical.progress(taskId, taskProgress.get(), - taskProgress.toString(), getPhase(), counters); + taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), + counters); + taskFound = umbilical.statusUpdate(taskId, taskStatus); + taskStatus.clearStatus(); } else { // send ping @@ -351,18 +365,21 @@ try { if (needProgress) { // send a final status report + taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), + counters); try { - if (!umbilical.progress(taskId, taskProgress.get(), - taskProgress.toString(), getPhase(), counters)) { + if (!umbilical.statusUpdate(getTaskId(), taskStatus)) { LOG.warn("Parent died. Exiting "+taskId); System.exit(66); } + taskStatus.clearStatus(); needProgress = false; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // interrupt ourself } } umbilical.done(taskId); + LOG.info("Task '" + getTaskId() + "' done."); return; } catch (IOException ie) { LOG.warn("Failure signalling completion: " + Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Aug 23 09:36:00 2007 @@ -302,8 +302,7 @@ !tasksReportedClosed.contains(taskid)){ tasksReportedClosed.add(taskid); return true; - } - else { + } else { return false; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Thu Aug 23 09:36:00 2007 @@ -20,7 +20,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -29,7 +32,10 @@ * not intended to be a comprehensive piece of data. * **************************************************/ -class TaskStatus implements Writable { +abstract class TaskStatus implements Writable, Cloneable { + static final Log LOG = + LogFactory.getLog(TaskStatus.class.getName()); + //enumeration for reporting current phase of a task. public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE} @@ -37,7 +43,6 @@ public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED} private String taskid; - private boolean isMap; private float progress; private State runState; private String diagnosticInfo; @@ -47,21 +52,16 @@ private long startTime; private long finishTime; - // only for reduce tasks - private long shuffleFinishTime; - private long sortFinishTime; - private Phase phase = Phase.STARTING; private Counters counters; public TaskStatus() {} - public TaskStatus(String taskid, boolean isMap, float progress, + public TaskStatus(String taskid, float progress, State runState, String diagnosticInfo, String stateString, String taskTracker, Phase phase, Counters counters) { this.taskid = taskid; - this.isMap = isMap; this.progress = progress; this.runState = runState; this.diagnosticInfo = diagnosticInfo; @@ -70,9 +70,9 @@ this.phase = phase; this.counters = counters; } - + public String getTaskId() { return taskid; } - public boolean getIsMap() { return isMap; } + public abstract boolean getIsMap(); public float getProgress() { return progress; } public void setProgress(float progress) { this.progress = progress; } public State getRunState() { return runState; } @@ -100,12 +100,6 @@ * @param finishTime finish time of task. */ void setFinishTime(long finishTime) { - if (shuffleFinishTime == 0) { - this.shuffleFinishTime = finishTime; - } - if (sortFinishTime == 0){ - this.sortFinishTime = finishTime; - } this.finishTime = finishTime; } /** @@ -117,16 +111,14 @@ * it returns approximate shuffle finish time. */ public long getShuffleFinishTime() { - return shuffleFinishTime; + return 0; } /** * Set shuffle finish time. * @param shuffleFinishTime */ - void setShuffleFinishTime(long shuffleFinishTime) { - this.shuffleFinishTime = shuffleFinishTime; - } + void setShuffleFinishTime(long shuffleFinishTime) {} /** * Get sort finish time for the task,. If sort finish time was not set @@ -136,7 +128,7 @@ * finish time if that is set, else it returns finish time. */ public long getSortFinishTime() { - return sortFinishTime; + return 0; } /** @@ -144,12 +136,7 @@ * then its set to sortFinishTime. * @param sortFinishTime */ - void setSortFinishTime(long sortFinishTime) { - this.sortFinishTime = sortFinishTime; - if (0 == this.shuffleFinishTime){ - this.shuffleFinishTime = sortFinishTime; - } - } + void setSortFinishTime(long sortFinishTime) {} /** * Get start time of the task. @@ -176,10 +163,19 @@ } /** * Set current phase of this task. - * @param p + * @param phase phase of this task */ - void setPhase(Phase p){ - this.phase = p; + void setPhase(Phase phase){ + TaskStatus.Phase oldPhase = getPhase(); + if (oldPhase != phase){ + // sort phase started + if (phase == TaskStatus.Phase.SORT){ + setShuffleFinishTime(System.currentTimeMillis()); + }else if (phase == TaskStatus.Phase.REDUCE){ + setSortFinishTime(System.currentTimeMillis()); + } + } + this.phase = phase; } /** * Get task's counters. @@ -194,13 +190,81 @@ public void setCounters(Counters counters) { this.counters = counters; } + + /** + * Get the list of maps from which output-fetches failed. + * + * @return the list of maps from which output-fetches failed. + */ + public List<String> getFetchFailedMaps() { + return null; + } + + /** + * Add to the list of maps from which output-fetches failed. + * + * @param mapTaskId map from which fetch failed + */ + synchronized void addFetchFailedMap(String mapTaskId) {} + + /** + * Update the status of the task. + * + * @param progress + * @param state + * @param phase + * @param counters + */ + synchronized void statusUpdate(float progress, String state, + Counters counters) { + setRunState(TaskStatus.State.RUNNING); + setProgress(progress); + setStateString(state); + setCounters(counters); + } + + /** + * Update the status of the task. + * + * @param status updated status + */ + synchronized void statusUpdate(TaskStatus status) { + this.progress = status.getProgress(); + this.runState = status.getRunState(); + this.diagnosticInfo = status.getDiagnosticInfo(); + this.stateString = status.getStateString(); + + if (status.getStartTime() != 0) { + this.startTime = status.getStartTime(); + } + if (status.getFinishTime() != 0) { + this.finishTime = status.getFinishTime(); + } + this.phase = status.getPhase(); + this.counters = status.getCounters(); + } + + /** + * Clear out transient information after sending out a status update + * to the [EMAIL PROTECTED] TaskTracker}. + */ + synchronized void clearStatus() {} + + public Object clone() { + try { + return super.clone(); + } catch (CloneNotSupportedException cnse) { + // Shouldn't happen since we do implement Clonable + throw new InternalError(cnse.toString()); + } + } + ////////////////////////////////////////////// // Writable ////////////////////////////////////////////// public void write(DataOutput out) throws IOException { UTF8.writeString(out, taskid); - out.writeBoolean(isMap); out.writeFloat(progress); WritableUtils.writeEnum(out, runState); UTF8.writeString(out, diagnosticInfo); @@ -208,16 +272,11 @@ WritableUtils.writeEnum(out, phase); out.writeLong(startTime); out.writeLong(finishTime); - if (!isMap){ - out.writeLong(shuffleFinishTime); - out.writeLong(sortFinishTime); - } counters.write(out); } public void readFields(DataInput in) throws IOException { this.taskid = UTF8.readString(in); - this.isMap = in.readBoolean(); this.progress = in.readFloat(); this.runState = WritableUtils.readEnum(in, State.class); this.diagnosticInfo = UTF8.readString(in); @@ -225,12 +284,51 @@ this.phase = WritableUtils.readEnum(in, Phase.class); this.startTime = in.readLong(); this.finishTime = in.readLong(); - if (!this.isMap){ - shuffleFinishTime = in.readLong(); - sortFinishTime = in.readLong(); - } counters = new Counters(); counters.readFields(in); + } + + ////////////////////////////////////////////////////////////////////////////// + // Factory-like methods to create/read/write appropriate TaskStatus objects + ////////////////////////////////////////////////////////////////////////////// + + static TaskStatus createTaskStatus(DataInput in, String taskId, float progress, + State runState, String diagnosticInfo, + String stateString, String taskTracker, + Phase phase, Counters counters) + throws IOException { + boolean isMap = in.readBoolean(); + return createTaskStatus(isMap, taskId, progress, runState, diagnosticInfo, + stateString, taskTracker, phase, counters); + } + + static TaskStatus createTaskStatus(boolean isMap, String taskId, float progress, + State runState, String diagnosticInfo, + String stateString, String taskTracker, + Phase phase, Counters counters) { + return (isMap) ? new MapTaskStatus(taskId, progress, runState, + diagnosticInfo, stateString, taskTracker, + phase, counters) : + new ReduceTaskStatus(taskId, progress, runState, + diagnosticInfo, stateString, + taskTracker, phase, counters); + } + + static TaskStatus createTaskStatus(boolean isMap) { + return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus(); + } + + static TaskStatus readTaskStatus(DataInput in) throws IOException { + boolean isMap = in.readBoolean(); + TaskStatus taskStatus = createTaskStatus(isMap); + taskStatus.readFields(in); + return taskStatus; + } + + static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) + throws IOException { + out.writeBoolean(taskStatus.getIsMap()); + taskStatus.write(out); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Aug 23 09:36:00 2007 @@ -669,7 +669,7 @@ tip.setJobConf(jobConf); tip.launchTask(); } catch (Throwable ie) { - tip.runstate = TaskStatus.State.FAILED; + tip.taskStatus.setRunState(TaskStatus.State.FAILED); try { tip.cleanup(); } catch (Throwable ie2) { @@ -906,15 +906,9 @@ // if (status == null) { synchronized (this) { - List<TaskStatus> taskReports = - new ArrayList<TaskStatus>(runningTasks.size()); - for (TaskInProgress tip: runningTasks.values()) { - taskReports.add(tip.createStatus()); - } - status = - new TaskTrackerStatus(taskTrackerName, localHostname, - httpPort, taskReports, - failures); + status = new TaskTrackerStatus(taskTrackerName, localHostname, + httpPort, cloneAndResetRunningTaskStatuses(), + failures); } } else { LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() + @@ -963,6 +957,12 @@ runningTasks.remove(taskStatus.getTaskId()); } } + + // Clear transient status information which should only + // be sent once to the JobTracker + for (TaskInProgress tip: runningTasks.values()) { + tip.getStatus().clearStatus(); + } } // Force a rebuild of 'status' on the next iteration @@ -1265,8 +1265,6 @@ /////////////////////////////////////////////////////// class TaskInProgress { Task task; - float progress; - volatile TaskStatus.State runstate; long lastProgressReport; StringBuffer diagnosticInfo = new StringBuffer(); private TaskRunner runner; @@ -1283,19 +1281,18 @@ */ public TaskInProgress(Task task, JobConf conf) { this.task = task; - this.progress = 0.0f; - this.runstate = TaskStatus.State.UNASSIGNED; this.lastProgressReport = System.currentTimeMillis(); this.defaultJobConf = conf; localJobConf = null; - taskStatus = new TaskStatus(task.getTaskId(), - task.isMapTask(), - progress, runstate, - diagnosticInfo.toString(), - "initializing", - getName(), task.isMapTask()? TaskStatus.Phase.MAP: - TaskStatus.Phase.SHUFFLE, - task.getCounters()); + taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskId(), + 0.0f, + TaskStatus.State.UNASSIGNED, + diagnosticInfo.toString(), + "initializing", + getName(), + task.isMapTask()? TaskStatus.Phase.MAP: + TaskStatus.Phase.SHUFFLE, + task.getCounters()); taskTimeout = (10 * 60 * 1000); } @@ -1350,14 +1347,12 @@ /** */ - public synchronized TaskStatus createStatus() { - taskStatus.setProgress(progress); - taskStatus.setRunState(runstate); + public synchronized TaskStatus getStatus() { taskStatus.setDiagnosticInfo(diagnosticInfo.toString()); - if (diagnosticInfo.length() > 0) { diagnosticInfo = new StringBuffer(); } + return taskStatus; } @@ -1366,7 +1361,7 @@ */ public synchronized void launchTask() throws IOException { localizeTask(task); - this.runstate = TaskStatus.State.RUNNING; + this.taskStatus.setRunState(TaskStatus.State.RUNNING); this.runner = task.createRunner(TaskTracker.this); this.runner.start(); this.taskStatus.setStartTime(System.currentTimeMillis()); @@ -1375,31 +1370,18 @@ /** * The task is reporting its progress */ - public synchronized void reportProgress(float p, String state, - TaskStatus.Phase newPhase, - Counters counters) + public synchronized void reportProgress(TaskStatus taskStatus) { if (this.done) { //make sure we ignore progress messages after a task has //invoked TaskUmbilicalProtocol.done() return; } - LOG.info(task.getTaskId()+" "+p+"% "+state); - this.progress = p; - this.runstate = TaskStatus.State.RUNNING; + + LOG.info(task.getTaskId() + " " + taskStatus.getProgress() + + "% " + taskStatus.getStateString()); + this.taskStatus.statusUpdate(taskStatus); this.lastProgressReport = System.currentTimeMillis(); - TaskStatus.Phase oldPhase = taskStatus.getPhase(); - if (oldPhase != newPhase){ - // sort phase started - if (newPhase == TaskStatus.Phase.SORT){ - this.taskStatus.setShuffleFinishTime(System.currentTimeMillis()); - }else if (newPhase == TaskStatus.Phase.REDUCE){ - this.taskStatus.setSortFinishTime(System.currentTimeMillis()); - } - this.taskStatus.setPhase(newPhase); - } - this.taskStatus.setStateString(state); - this.taskStatus.setCounters(counters); } /** @@ -1411,7 +1393,7 @@ /** */ public TaskStatus.State getRunState() { - return runstate; + return taskStatus.getRunState(); } /** @@ -1434,10 +1416,12 @@ * The task is reporting that it's done running */ public synchronized void reportDone() { - LOG.info("Task " + task.getTaskId() + " is done."); - this.progress = 1.0f; + this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + this.taskStatus.setProgress(1.0f); this.taskStatus.setFinishTime(System.currentTimeMillis()); this.done = true; + + LOG.info("Task " + task.getTaskId() + " is done."); } /** @@ -1464,19 +1448,19 @@ boolean needCleanup = false; synchronized (this) { if (done) { - runstate = TaskStatus.State.SUCCEEDED; + taskStatus.setRunState(TaskStatus.State.SUCCEEDED); } else { if (!wasKilled) { failures += 1; - runstate = TaskStatus.State.FAILED; + taskStatus.setRunState(TaskStatus.State.FAILED); } else { - runstate = TaskStatus.State.KILLED; + taskStatus.setRunState(TaskStatus.State.KILLED); } - progress = 0.0f; + taskStatus.setProgress(0.0f); } this.taskStatus.setFinishTime(System.currentTimeMillis()); - needCleanup = (runstate == TaskStatus.State.FAILED) | - (runstate == TaskStatus.State.KILLED); + needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || + taskStatus.getRunState() == TaskStatus.State.KILLED); } // @@ -1517,20 +1501,21 @@ * @param wasFailure was it a failure (versus a kill request)? */ public synchronized void kill(boolean wasFailure) throws IOException { - if (runstate == TaskStatus.State.RUNNING) { + if (taskStatus.getRunState() == TaskStatus.State.RUNNING) { wasKilled = true; if (wasFailure) { failures += 1; } runner.kill(); - runstate = - (wasFailure) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED; - } else if (runstate == TaskStatus.State.UNASSIGNED) { + taskStatus.setRunState((wasFailure) ? + TaskStatus.State.FAILED : + TaskStatus.State.KILLED); + } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { if (wasFailure) { failures += 1; - runstate = TaskStatus.State.FAILED; + taskStatus.setRunState(TaskStatus.State.FAILED); } else { - runstate = TaskStatus.State.KILLED; + taskStatus.setRunState(TaskStatus.State.KILLED); } } } @@ -1540,10 +1525,11 @@ */ private synchronized void mapOutputLost(String failure ) throws IOException { - if (runstate == TaskStatus.State.SUCCEEDED) { + if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) { + // change status to failure LOG.info("Reporting output lost:"+task.getTaskId()); - runstate = TaskStatus.State.FAILED; // change status to failure - progress = 0.0f; + taskStatus.setRunState(TaskStatus.State.FAILED); + taskStatus.setProgress(0.0f); reportDiagnosticInfo("Map output lost, rescheduling: " + failure); runningTasks.put(task.getTaskId(), this); @@ -1567,7 +1553,7 @@ synchronized (TaskTracker.this) { tasks.remove(taskId); if (alwaysKeepTaskFiles || - (runstate == TaskStatus.State.FAILED && + (taskStatus.getRunState() == TaskStatus.State.FAILED && keepFailedTaskFiles)) { return; } @@ -1618,14 +1604,12 @@ /** * Called periodically to report Task progress, from 0.0 to 1.0. */ - public synchronized boolean progress(String taskid, float progress, - String state, - TaskStatus.Phase phase, - Counters counters - ) throws IOException { + public synchronized boolean statusUpdate(String taskid, + TaskStatus taskStatus) + throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { - tip.reportProgress(progress, state, phase, counters); + tip.reportProgress(taskStatus); return true; } else { LOG.warn("Progress from unknown child task: "+taskid); @@ -1663,6 +1647,18 @@ } } + + /** + * A reduce-task failed to shuffle the map-outputs. Kill the task. + */ + public synchronized void shuffleError(String taskId, String message) + throws IOException { + LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message); + TaskInProgress tip = runningTasks.get(taskId); + tip.reportDiagnosticInfo("Shuffle Error: " + message); + purgeTask(tip, true); + } + /** * A child task had a local filesystem error. Kill the task. */ @@ -1826,6 +1822,15 @@ return taskTrackerName; } + private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() { + List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size()); + for(TaskInProgress tip: runningTasks.values()) { + TaskStatus status = tip.getStatus(); + result.add((TaskStatus)status.clone()); + status.clearStatus(); + } + return result; + } /** * Get the list of tasks that will be reported back to the * job tracker in the next heartbeat cycle. @@ -1834,7 +1839,7 @@ synchronized List<TaskStatus> getRunningTaskStatuses() { List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size()); for(TaskInProgress tip: runningTasks.values()) { - result.add(tip.createStatus()); + result.add(tip.getStatus()); } return result; } @@ -1847,7 +1852,7 @@ List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size()); for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) { if (!runningTasks.containsKey(task.getKey())) { - result.add(task.getValue().createStatus()); + result.add(task.getValue().getStatus()); } } return result; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Aug 23 09:36:00 2007 @@ -156,29 +156,24 @@ UTF8.writeString(out, trackerName); UTF8.writeString(out, host); out.writeInt(httpPort); + out.writeInt(failures); out.writeInt(taskReports.size()); - out.writeInt(failures); - for (Iterator it = taskReports.iterator(); it.hasNext();) { - ((TaskStatus) it.next()).write(out); + for (TaskStatus taskStatus : taskReports) { + TaskStatus.writeTaskStatus(out, taskStatus); } } - /** - */ public void readFields(DataInput in) throws IOException { this.trackerName = UTF8.readString(in); this.host = UTF8.readString(in); this.httpPort = in.readInt(); + this.failures = in.readInt(); taskReports.clear(); - int numTasks = in.readInt(); - this.failures = in.readInt(); for (int i = 0; i < numTasks; i++) { - TaskStatus tmp = new TaskStatus(); - tmp.readFields(in); - taskReports.add(tmp); + taskReports.add(TaskStatus.readTaskStatus(in)); } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=569063&r1=569062&r2=569063&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Aug 23 09:36:00 2007 @@ -29,26 +29,31 @@ * and parent is via this protocol. */ interface TaskUmbilicalProtocol extends VersionedProtocol { - /** Changed the version to 2, since we have a new method getMapOutputs + /** + * Changed the version to 2, since we have a new method getMapOutputs * Changed version to 3 to have progress() return a boolean + * Changed the version to 4, since we have replaced + * TaskUmbilicalProtocol.progress(String, float, String, + * org.apache.hadoop.mapred.TaskStatus.Phase, Counters) + * with [EMAIL PROTECTED] #statusUpdate(String, TaskStatus)} * */ - public static final long versionID = 3L; + public static final long versionID = 4L; /** Called when a child task process starts, to get its task.*/ Task getTask(String taskid) throws IOException; - /** Report child's progress to parent. - * @param taskid the id of the task - * @param progress value between zero and one - * @param state description of task's current state - * @param phase current phase of the task. - * @param counters the counters for this task. + /** + * Report child's progress to parent. + * + * @param taskId task-id of the child + * @param taskStatus status of the child + * @throws IOException + * @throws InterruptedException * @return True if the task is known */ - boolean progress(String taskid, float progress, String state, - TaskStatus.Phase phase, Counters counters) - throws IOException, InterruptedException; - + boolean statusUpdate(String taskId, TaskStatus taskStatus) + throws IOException, InterruptedException; + /** Report error messages back to parent. Calls should be sparing, since all * such messages are held in the job tracker. * @param taskid the id of the task involved @@ -65,6 +70,9 @@ * the task process exits without calling this. */ void done(String taskid) throws IOException; + /** Report that a reduce-task couldn't shuffle map-outputs.*/ + void shuffleError(String taskId, String message) throws IOException; + /** Report that the task encounted a local filesystem error.*/ void fsError(String taskId, String message) throws IOException;