Author: acmurthy Date: Fri Dec 21 10:57:55 2007 New Revision: 606268 URL: http://svn.apache.org/viewvc?rev=606268&view=rev Log: Merge -r 606266:606267 from trunk to branch-0.15 to fix HADOOP-2247.
Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=606268&r1=606267&r2=606268&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Fri Dec 21 10:57:55 2007 @@ -39,6 +39,14 @@ round-robin disk selections randomly. This helps in spreading data across multiple partitions much better. (acmurhty) + HADOOP-2247. Fine-tune the strategies for killing mappers and reducers + due to failures while fetching map-outputs. Now the map-completion times + and number of currently running reduces are taken into account by the + JobTracker before killing the mappers, while the progress made by the + reducer and the number of fetch-failures vis-a-vis total number of + fetch-attempts are taken into account before teh reducer kills itself. + (Amar Kamat via acmurthy) + IMPROVEMENTS HADOOP-2160. Remove project-level, non-user documentation from Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=606268&r1=606267&r2=606268&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 21 10:57:55 2007 @@ -79,6 +79,9 @@ // The maximum percentage of trackers in cluster added to the 'blacklist'. private static final double CLUSTER_BLACKLIST_PERCENT = 0.25; + // The maximum percentage of fetch failures allowed for a map + private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5; + // No. of tasktrackers in the cluster private volatile int clusterSize = 0; @@ -405,6 +408,8 @@ TaskCompletionEvent.Status.SUCCEEDED, httpTaskLogLocation ); + taskEvent.setTaskRunTime((int)(status.getFinishTime() + - status.getStartTime())); tip.setSuccessEventNumber(taskCompletionEventTracker); } //For a failed task update the JT datastructures.For the task state where @@ -1164,7 +1169,13 @@ LOG.info("Failed fetch notification #" + fetchFailures + " for task " + mapTaskId); - if (fetchFailures == MAX_FETCH_FAILURES_NOTIFICATIONS) { + float failureRate = (float)fetchFailures / runningReduceTasks; + // declare faulty if fetch-failures >= max-allowed-failures + boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) + ? true + : false; + if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS + && isMapFaulty) { LOG.info("Too many fetch-failures for output of task: " + mapTaskId + " ... killing it"); Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=606268&r1=606267&r2=606268&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Dec 21 10:57:55 2007 @@ -476,11 +476,31 @@ private long ramfsMergeOutputSize; /** - * Maximum no. of fetch-retries per-map. + * the max of all the map completion times + */ + private int maxMapRuntime; + + /** + * Maximum number of fetch-retries per-map. */ private static final int MAX_FETCH_RETRIES_PER_MAP = 5; /** + * Maximum percent of failed fetch attempt before killing the reduce task. + */ + private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f; + + /** + * Minimum percent of progress required to keep the reduce alive. + */ + private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f; + + /** + * Maximum percent of shuffle execution time required to keep the reducer alive. + */ + private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f; + + /** * Maximum no. of unique maps from which we failed to fetch map-outputs * even after [EMAIL PROTECTED] #MAX_FETCH_RETRIES_PER_MAP} retries; after this the * reduce task is failed. @@ -862,12 +882,14 @@ (this.reduceTask.getPartition()%10) ); this.random = new Random(randomSeed); + this.maxMapRuntime = 0; } public boolean fetchOutputs() throws IOException { final int numOutputs = reduceTask.getNumMaps(); List<MapOutputLocation> knownOutputs = new ArrayList<MapOutputLocation>(numCopiers); + int totalFailures = 0; int numInFlight = 0, numCopied = 0; int lowThreshold = numCopiers*2; long bytesTransferred = 0; @@ -896,6 +918,7 @@ // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); long currentTime = startTime; + long lastProgressTime = System.currentTimeMillis(); IntWritable fromEventId = new IntWritable(0); try { @@ -1005,6 +1028,7 @@ if (cr != null) { if (cr.getSuccess()) { // a successful copy numCopied++; + lastProgressTime = System.currentTimeMillis(); bytesTransferred += cr.getSize(); long secsSinceStart = @@ -1033,6 +1057,7 @@ String mapTaskId = cr.getLocation().getMapTaskId(); Integer mapId = cr.getLocation().getMapId(); + totalFailures++; Integer noFailedFetches = mapTaskToFailedFetchesMap.get(mapTaskId); noFailedFetches = @@ -1056,8 +1081,43 @@ fetchFailedMaps.add(mapId); // did we have too many unique failed-fetch maps? - if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) { - LOG.fatal("Shuffle failed with too many fetch failures! " + + // and did we fail on too many fetch attempts? + // and did we progress enough + // or did we wait for too long without any progress? + + // check if the reducer is healthy + boolean reducerHealthy = + (((float)totalFailures / (totalFailures + numCopied)) + < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT); + + // check if the reducer has progressed enough + boolean reducerProgressedEnough = + (((float)numCopied / numMaps) + >= MIN_REQUIRED_PROGRESS_PERCENT); + + // check if the reducer is stalled for a long time + + // duration for which the reducer is stalled + int stallDuration = + (int)(System.currentTimeMillis() - lastProgressTime); + // duration for which the reducer ran with progress + int shuffleProgressDuration = + (int)(lastProgressTime - startTime); + // min time the reducer should run without getting killed + int minShuffleRunDuration = + (shuffleProgressDuration > maxMapRuntime) + ? shuffleProgressDuration + : maxMapRuntime; + boolean reducerStalled = + (((float)stallDuration / minShuffleRunDuration) + >= MAX_ALLOWED_STALL_TIME_PERCENT); + + // kill if not healthy and has insufficient progress + if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) + && !reducerHealthy + && (!reducerProgressedEnough || reducerStalled)) { + LOG.fatal("Shuffle failed with too many fetch failures " + + "and insufficient progress!" + "Killing task " + getTaskId() + "."); umbilical.shuffleError(getTaskId(), "Exceeded MAX_FAILED_UNIQUE_FETCHES;" @@ -1251,6 +1311,13 @@ int port = u.getPort(); String taskId = event.getTaskId(); int mId = event.idWithinJob(); + int duration = event.getTaskRunTime(); + if (duration > maxMapRuntime) { + maxMapRuntime = duration; + // adjust max-fetch-retries based on max-map-run-time + maxFetchRetriesPerMap = + getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1); + } knownOutputs.add(new MapOutputLocation(taskId, mId, host, port)); } break; Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=606268&r1=606267&r2=606268&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Fri Dec 21 10:57:55 2007 @@ -33,6 +33,7 @@ private int eventId; private String taskTrackerHttp; + private int taskRunTime; // using int since runtime is the time difference private String taskId; Status status; boolean isMap = false; @@ -95,6 +96,22 @@ public String getTaskTrackerHttp() { return taskTrackerHttp; } + + /** + * Returns time (in millisec) the task took to complete. + */ + public int getTaskRunTime() { + return taskRunTime; + } + + /** + * Set the task completion time + * @param taskCompletionTime time (in millisec) the task took to complete + */ + public void setTaskRunTime(int taskCompletionTime) { + this.taskRunTime = taskCompletionTime; + } + /** * set event Id. should be assigned incrementally starting from 0. * @param eventId @@ -153,6 +170,7 @@ out.writeBoolean(isMap); WritableUtils.writeEnum(out, status); WritableUtils.writeString(out, taskTrackerHttp); + WritableUtils.writeVInt(out, taskRunTime); } public void readFields(DataInput in) throws IOException { @@ -161,5 +179,6 @@ this.isMap = in.readBoolean(); this.status = WritableUtils.readEnum(in, Status.class); this.taskTrackerHttp = WritableUtils.readString(in); + this.taskRunTime = WritableUtils.readVInt(in); } }