Author: cutting Date: Wed Aug 9 06:48:10 2006 New Revision: 430052 URL: http://svn.apache.org/viewvc?rev=430052&view=rev Log: HADOOP-400. Improvements to task assignment.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=430052&r1=430051&r2=430052&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Aug 9 06:48:10 2006 @@ -11,6 +11,11 @@ Solaris. This was causing nightly builds to fail. (Michel Tourn via cutting) + 3. HADOOP-400. Improvements to task assignment. Tasks are no longer + re-run on nodes where they have failed (unless no other node is + available). Also, tasks are better load-balanced among nodes. + (omalley via cutting) + Release 0.5.0 - 2006-08-04 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=430052&r1=430051&r2=430052&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Aug 9 06:48:10 2006 @@ -311,167 +311,132 @@ /** * Return a MapTask, if appropriate, to run on the given tasktracker */ - public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) { + public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) { + if (! tasksInited) { + LOG.info("Cannot create task split for " + profile.getJobId()); + return null; + } + ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost()); + double avgProgress = status.mapProgress() / maps.length; + int target = findNewTask(tts, clusterSize, avgProgress, + maps, firstMapToTry, mapCache); + if (target == -1) { + return null; + } + boolean wasRunning = maps[target].isRunning(); + Task result = maps[target].getTaskToRun(tts.getTrackerName()); + if (!wasRunning) { + runningMapTasks += 1; + } + return result; + } + + /** + * Return a ReduceTask, if appropriate, to run on the given tasktracker. + * We don't have cache-sensitivity for reduce tasks, as they + * work on temporary MapRed files. + */ + public Task obtainNewReduceTask(TaskTrackerStatus tts, + int clusterSize) { if (! tasksInited) { LOG.info("Cannot create task split for " + profile.getJobId()); return null; } - Task t = null; - int cacheTarget = -1; - int stdTarget = -1; - int specTarget = -1; - int failedTarget = -1; - - // - // We end up creating two tasks for the same bucket, because - // we call obtainNewMapTask() really fast, twice in a row. - // There's not enough time for the "recentTasks" - // - - // - // Compute avg progress through the map tasks - // - double avgProgress = status.mapProgress() / maps.length; - + double avgProgress = status.reduceProgress() / reduces.length; + int target = findNewTask(tts, clusterSize, avgProgress, + reduces, firstReduceToTry, null); + if (target == -1) { + return null; + } + boolean wasRunning = reduces[target].isRunning(); + Task result = reduces[target].getTaskToRun(tts.getTrackerName()); + if (!wasRunning) { + runningReduceTasks += 1; + } + return result; + } + + /** + * Find a new task to run. + * @param tts The task tracker that is asking for a task + * @param clusterSize The number of task trackers in the cluster + * @param avgProgress The average progress of this kind of task in this job + * @param tasks The list of potential tasks to try + * @param firstTaskToTry The first index in tasks to check + * @param cachedTasks A list of tasks that would like to run on this node + * @return the index in tasks of the selected task (or -1 for no task) + */ + private int findNewTask(TaskTrackerStatus tts, + int clusterSize, + double avgProgress, + TaskInProgress[] tasks, + int firstTaskToTry, + List cachedTasks) { + String taskTracker = tts.getTrackerName(); // // See if there is a split over a block that is stored on // the TaskTracker checking in. That means the block // doesn't have to be transmitted from another node. // - ArrayList hostMaps = (ArrayList)hostToMaps.get(tts.getHost()); - if (hostMaps != null) { - Iterator i = hostMaps.iterator(); + if (cachedTasks != null) { + Iterator i = cachedTasks.iterator(); while (i.hasNext()) { TaskInProgress tip = (TaskInProgress)i.next(); - if (tip.hasTask() && !tip.hasFailedOnMachine(taskTracker)) { - LOG.info("Found task with local split for "+tts.getHost()); - cacheTarget = tip.getIdWithinJob(); - i.remove(); - break; + i.remove(); + if (tip.isRunnable() && + !tip.isRunning() && + !tip.hasFailedOnMachine(taskTracker)) { + LOG.info("Choosing cached task " + tip.getTIPId()); + int cacheTarget = tip.getIdWithinJob(); + return cacheTarget; } } } + // // If there's no cached target, see if there's // a std. task to run. // - if (cacheTarget < 0) { - for (int i = 0; i < maps.length; i++) { - int realIdx = (i + firstMapToTry) % maps.length; - if (maps[realIdx].hasTask()) { - if (stdTarget < 0) { - if (maps[realIdx].hasFailedOnMachine(taskTracker)) { - if (failedTarget < 0) { - failedTarget = realIdx; - } - } else { - stdTarget = realIdx; - break; - } - } - } - } - } - - // - // If no cached-target and no std target, see if - // there's a speculative task to run. - // - if (cacheTarget < 0 && stdTarget < 0) { - for (int i = 0; i < maps.length; i++) { - int realIdx = (i + firstMapToTry) % maps.length; - if (maps[realIdx].hasSpeculativeTask(avgProgress)) { - if (!maps[realIdx].hasFailedOnMachine(taskTracker)) { - specTarget = realIdx; - break; - } - } - } - } - - // - // Run whatever we found - // - if (cacheTarget >= 0) { - t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress); - runningMapTasks += 1; - } else if (stdTarget >= 0) { - t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress); - runningMapTasks += 1; - } else if (specTarget >= 0) { - //should always be true, but being paranoid - boolean isRunning = maps[specTarget].isRunning(); - t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress); - if (!isRunning){ - runningMapTasks += 1; - } - } else if (failedTarget >= 0) { - //should always be false, but being paranoid again - boolean isRunning = maps[failedTarget].isRunning(); - t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress); - if (!isRunning) { - runningMapTasks += 1; - } - } - return t; - } - - /** - * Return a ReduceTask, if appropriate, to run on the given tasktracker. - * We don't have cache-sensitivity for reduce tasks, as they - * work on temporary MapRed files. - */ - public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) { - if (! tasksInited) { - LOG.info("Cannot create task split for " + profile.getJobId()); - return null; - } - - Task t = null; - int stdTarget = -1; - int specTarget = -1; int failedTarget = -1; - double avgProgress = status.reduceProgress() / reduces.length; - - for (int i = 0; i < reduces.length; i++) { - int realIdx = (i + firstReduceToTry) % reduces.length; - if (reduces[realIdx].hasTask()) { - if (reduces[realIdx].hasFailedOnMachine(taskTracker)) { - if (failedTarget < 0) { - failedTarget = realIdx; - } - } else if (stdTarget < 0) { - stdTarget = realIdx; - } - } else if (reduces[realIdx].hasSpeculativeTask(avgProgress)) { - if (specTarget < 0 && - !reduces[realIdx].hasFailedOnMachine(taskTracker)) { - specTarget = realIdx; - } + int specTarget = -1; + for (int i = 0; i < tasks.length; i++) { + int realIdx = (i + firstTaskToTry) % tasks.length; + TaskInProgress task = tasks[realIdx]; + if (task.isRunnable()) { + // if it failed here and we haven't tried every machine, we + // don't schedule it here. + boolean hasFailed = task.hasFailedOnMachine(taskTracker); + if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) { + continue; + } + boolean isRunning = task.isRunning(); + if (hasFailed) { + // failed tasks that aren't running can be scheduled as a last + // resort + if (!isRunning && failedTarget == -1) { + failedTarget = realIdx; + } + } else { + if (!isRunning) { + LOG.info("Choosing normal task " + tasks[realIdx].getTIPId()); + return realIdx; + } else if (specTarget == -1 && + task.hasSpeculativeTask(avgProgress)) { + specTarget = realIdx; + } } + } } - - if (stdTarget >= 0) { - t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress); - runningReduceTasks += 1; - } else if (specTarget >= 0) { - //should be false - boolean isRunning = reduces[specTarget].isRunning(); - t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress); - if (!isRunning){ - runningReduceTasks += 1; - } - } else if (failedTarget >= 0) { - boolean isRunning = reduces[failedTarget].isRunning(); - t = reduces[failedTarget].getTaskToRun(taskTracker, tts, - avgProgress); - if (!isRunning){ - runningReduceTasks += 1; - } + if (specTarget != -1) { + LOG.info("Choosing speculative task " + + tasks[specTarget].getTIPId()); + } else if (failedTarget != -1) { + LOG.info("Choosing failed task " + + tasks[failedTarget].getTIPId()); } - return t; + return specTarget != -1 ? specTarget : failedTarget; } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=430052&r1=430051&r2=430052&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Aug 9 06:48:10 2006 @@ -779,13 +779,17 @@ int remainingMapLoad = 0; int numTaskTrackers; TaskTrackerStatus tts; - int avgMapLoad = 0; - int avgReduceLoad = 0; synchronized (taskTrackers) { numTaskTrackers = taskTrackers.size(); tts = (TaskTrackerStatus) taskTrackers.get(taskTracker); } + if (tts == null) { + LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker); + return null; + } + int totalCapacity = numTaskTrackers * maxCurrentTasks; + synchronized(jobsByArrival){ for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { JobInProgress job = (JobInProgress) it.next(); @@ -797,19 +801,23 @@ } } } - + + // find out the maximum number of maps or reduces that we are willing + // to run on any node. + int maxMapLoad = 0; + int maxReduceLoad = 0; if (numTaskTrackers > 0) { - avgMapLoad = remainingMapLoad / numTaskTrackers; - avgReduceLoad = remainingReduceLoad / numTaskTrackers; + maxMapLoad = Math.min(maxCurrentTasks, + (int) Math.ceil((double) remainingMapLoad / + numTaskTrackers)); + maxReduceLoad = Math.min(maxCurrentTasks, + (int) Math.ceil((double) remainingReduceLoad + / numTaskTrackers)); } - int totalCapacity = numTaskTrackers * maxCurrentTasks; + // // Get map + reduce counts for the current tracker. // - if (tts == null) { - LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker); - return null; - } int numMaps = tts.countMapTasks(); int numReduces = tts.countReduceTasks(); @@ -823,18 +831,12 @@ // // We hand a task to the current taskTracker if the given machine - // has a workload that's equal to or less than the pendingMaps average. - // This way the maps are launched if the TaskTracker has running tasks - // less than the pending average - // +/- TASK_ALLOC_EPSILON. (That epsilon is in place in case - // there is an odd machine that is failing for some reason but - // has not yet been removed from the pool, making capacity seem - // larger than it really is.) + // has a workload that's less than the maximum load of that kind of + // task. // synchronized (jobsByArrival) { - if ((numMaps < maxCurrentTasks) && - (numMaps <= avgMapLoad + 1 + TASK_ALLOC_EPSILON)) { + if (numMaps < maxMapLoad) { int totalNeededMaps = 0; for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { @@ -843,7 +845,7 @@ continue; } - Task t = job.obtainNewMapTask(taskTracker, tts); + Task t = job.obtainNewMapTask(tts, numTaskTrackers); if (t != null) { expireLaunchingTasks.addNewTask(t.getTaskId()); myMetrics.launchMap(); @@ -870,17 +872,17 @@ // // Same thing, but for reduce tasks // - if ((numReduces < maxCurrentTasks) && - (numReduces <= avgReduceLoad + 1 + TASK_ALLOC_EPSILON)) { + if (numReduces < maxReduceLoad) { int totalNeededReduces = 0; for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { JobInProgress job = (JobInProgress) it.next(); - if (job.getStatus().getRunState() != JobStatus.RUNNING) { + if (job.getStatus().getRunState() != JobStatus.RUNNING || + job.numReduceTasks == 0) { continue; } - Task t = job.obtainNewReduceTask(taskTracker, tts); + Task t = job.obtainNewReduceTask(tts, numTaskTrackers); if (t != null) { expireLaunchingTasks.addNewTask(t.getTaskId()); myMetrics.launchReduce(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=430052&r1=430051&r2=430052&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Aug 9 06:48:10 2006 @@ -16,8 +16,10 @@ package org.apache.hadoop.mapred; import org.apache.commons.logging.*; +import org.apache.hadoop.util.*; import java.text.NumberFormat; +import java.io.*; import java.util.*; @@ -391,21 +393,12 @@ ///////////////////////////////////////////////// /** - * Return whether this TIP has a non-speculative task to run + * Return whether this TIP still needs to run */ - boolean hasTask() { - if (failed || isComplete() || recentTasks.size() > 0) { - return false; - } else { - for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) { - TaskStatus ts = (TaskStatus) it.next(); - if (ts.getRunState() == TaskStatus.RUNNING) { - return false; - } - } - return true; - } + boolean isRunnable() { + return !failed && (completes == 0); } + /** * Return whether the TIP has a speculative task to run. We * only launch a speculative task if the current TIP is really @@ -430,27 +423,24 @@ /** * Return a Task that can be sent to a TaskTracker for execution. */ - public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) { + public Task getTaskToRun(String taskTracker) { Task t = null; - if (hasTask() || - hasSpeculativeTask(avgProgress)) { - - String taskid = (String) usableTaskIds.first(); - usableTaskIds.remove(taskid); - String jobId = job.getProfile().getJobId(); - - if (isMapTask()) { - t = new MapTask(jobId, jobFile, taskid, partition, split); - } else { - t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps); - } - t.setConf(conf); - recentTasks.add(taskid); + String taskid = (String) usableTaskIds.first(); + usableTaskIds.remove(taskid); + String jobId = job.getProfile().getJobId(); - // Ask JobTracker to note that the task exists - jobtracker.createTaskEntry(taskid, taskTracker, this); + if (isMapTask()) { + t = new MapTask(jobId, jobFile, taskid, partition, split); + } else { + t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps); } + t.setConf(conf); + + recentTasks.add(taskid); + + // Ask JobTracker to note that the task exists + jobtracker.createTaskEntry(taskid, taskTracker, this); return t; } @@ -461,6 +451,14 @@ */ public boolean hasFailedOnMachine(String tracker) { return machinesWhereFailed.contains(tracker); + } + + /** + * Get the number of machines where this task has failed. + * @return the size of the failed machine set + */ + public int getNumberOfFailedMachines() { + return machinesWhereFailed.size(); } /**