Author: matei
Date: Mon Jan 12 12:42:29 2009
New Revision: 733895
URL: http://svn.apache.org/viewvc?rev=733895&view=rev
Log:
HADOOP-4943. Fair share scheduler does not utilize all slots if the task
trackers are configured heterogeneously.
Contributed by Zheng Shao.
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
(original)
+++
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
Mon Jan 12 12:42:29 2009
@@ -31,24 +31,22 @@
* out uniformly across the nodes rather than being clumped up on whichever
* machines sent out heartbeats earliest.
*/
- int getCap(TaskTrackerStatus tracker,
- int totalRunnableTasks, int localMaxTasks) {
- int numTaskTrackers = taskTrackerManager.taskTrackers().size();
- return Math.min(localMaxTasks,
- (int) Math.ceil((double) totalRunnableTasks / numTaskTrackers));
+ int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) {
+ double load = ((double)totalRunnableTasks) / totalSlots;
+ return (int) Math.ceil(localMaxTasks * Math.min(1.0, load));
}
@Override
public boolean canAssignMap(TaskTrackerStatus tracker,
- int totalRunnableMaps) {
- return tracker.countMapTasks() < getCap(tracker, totalRunnableMaps,
- tracker.getMaxMapTasks());
+ int totalRunnableMaps, int totalMapSlots) {
+ return tracker.countMapTasks() < getCap(totalRunnableMaps,
+ tracker.getMaxMapTasks(), totalMapSlots);
}
@Override
public boolean canAssignReduce(TaskTrackerStatus tracker,
- int totalRunnableReduces) {
- return tracker.countReduceTasks() < getCap(tracker, totalRunnableReduces,
- tracker.getMaxReduceTasks());
+ int totalRunnableReduces, int totalReduceSlots) {
+ return tracker.countReduceTasks() < getCap(totalRunnableReduces,
+ tracker.getMaxReduceTasks(), totalReduceSlots);
}
}
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Mon Jan 12 12:42:29 2009
@@ -232,14 +232,20 @@
runnableMaps += runnableTasks(job, TaskType.MAP);
runnableReduces += runnableTasks(job, TaskType.REDUCE);
}
+
+ // Compute total map/reduce slots
+ // In the future we can precompute this if the Scheduler becomes a
+ // listener of tracker join/leave events.
+ int totalMapSlots = getTotalSlots(TaskType.MAP);
+ int totalReduceSlots = getTotalSlots(TaskType.REDUCE);
// Scan to see whether any job needs to run a map, then a reduce
ArrayList<Task> tasks = new ArrayList<Task>();
TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
for (TaskType taskType: types) {
boolean canAssign = (taskType == TaskType.MAP) ?
- loadMgr.canAssignMap(tracker, runnableMaps) :
- loadMgr.canAssignReduce(tracker, runnableReduces);
+ loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) :
+ loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots);
if (canAssign) {
// Figure out the jobs that need this type of task
List<JobInProgress> candidates = new ArrayList<JobInProgress>();
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
(original)
+++
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
Mon Jan 12 12:42:29 2009
@@ -63,17 +63,19 @@
* Can a given {...@link TaskTracker} run another map task?
* @param tracker The machine we wish to run a new map on
* @param totalRunnableMaps Set of running jobs in the cluster
+ * @param totalMapSlots The total number of map slots in the cluster
* @return true if another map can be launched on <code>tracker</code>
*/
public abstract boolean canAssignMap(TaskTrackerStatus tracker,
- int totalRunnableMaps);
+ int totalRunnableMaps, int totalMapSlots);
/**
* Can a given {...@link TaskTracker} run another reduce task?
* @param tracker The machine we wish to run a new map on
- * @param totalReducesNeeded Set of running jobs in the cluster
+ * @param totalRunnableReduces Set of running jobs in the cluster
+ * @param totalReduceSlots The total number of reduce slots in the cluster
* @return true if another reduce can be launched on <code>tracker</code>
*/
public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
- int totalRunnableReduces);
+ int totalRunnableReduces, int totalReduceSlots);
}
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=733895&r1=733894&r2=733895&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Mon Jan 12 12:42:29 2009
@@ -1150,6 +1150,24 @@
assertEquals(0.28, info4.mapFairShare, 0.01);
assertEquals(0.28, info4.reduceFairShare, 0.01);
}
+
+ /**
+ * Tests that max-running-tasks per node are set by assigning load
+ * equally accross the cluster in CapBasedLoadManager.
+ */
+ public void testCapBasedLoadManager() {
+ CapBasedLoadManager loadMgr = new CapBasedLoadManager();
+ // Arguments to getCap: totalRunnableTasks, nodeCap, totalSlots
+ // Desired behavior: return ceil(nodeCap * min(1,
runnableTasks/totalSlots))
+ assertEquals(1, loadMgr.getCap(1, 1, 100));
+ assertEquals(1, loadMgr.getCap(1, 2, 100));
+ assertEquals(1, loadMgr.getCap(1, 10, 100));
+ assertEquals(1, loadMgr.getCap(200, 1, 100));
+ assertEquals(1, loadMgr.getCap(1, 5, 100));
+ assertEquals(3, loadMgr.getCap(50, 5, 100));
+ assertEquals(5, loadMgr.getCap(100, 5, 100));
+ assertEquals(5, loadMgr.getCap(200, 5, 100));
+ }
private void advanceTime(long time) {
clock.advance(time);