Author: yhemanth
Date: Mon Jan 5 03:35:22 2009
New Revision: 731530
URL: http://svn.apache.org/viewvc?rev=731530&view=rev
Log:
HADOOP-4979. Fix capacity scheduler to block cluster for failed high RAM
requirements across task types. Contributed by Vivek Ratan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=731530&r1=731529&r2=731530&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jan 5 03:35:22 2009
@@ -527,6 +527,9 @@
HADOOP-4949. Fix native compilation. (Chris Douglas via rangadi)
+ HADOOP-4979. Fix capacity scheduler to block cluster for failed high
+ RAM requirements across task types. (Vivek Ratan via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=731530&r1=731529&r2=731530&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Mon Jan 5 03:35:22 2009
@@ -843,7 +843,8 @@
TaskLookUpStatus.NO_TASK_IN_JOB, msg);
}
- private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws
IOException {
+ // don't return null
+ private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws
IOException {
Task t = null;
/*
@@ -867,7 +868,8 @@
// Queues are sorted so that ones without capacities
// come towards the end. Hence, we can simply return
// from here without considering any further queues.
- return null;
+ return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
+ null);
}
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -878,10 +880,9 @@
}
if (lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
- t = tlr.getTask();
// we have a task. Update reclaimed resource info
updateReclaimedResources(qsi);
- return Collections.singletonList(t);
+ return tlr;
}
if (lookUpStatus ==
TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
@@ -891,13 +892,14 @@
LOG.warn(msg);
LOG.warn("Returning nothing to the Tasktracker "
+ taskTracker.trackerName);
- return null;
+ return tlr;
}
}
}
// nothing to give
- return null;
+ return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
+ null);
}
private void printQSIs() {
@@ -1307,7 +1309,7 @@
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {
- List<Task> tasks = null;
+ TaskLookupResult tlr;
/*
* If TT has Map and Reduce slot free, we need to figure out whether to
* give it a Map or Reduce task.
@@ -1324,22 +1326,54 @@
int currentMapTasks = taskTracker.countMapTasks();
int maxReduceTasks = taskTracker.getMaxReduceTasks();
int currentReduceTasks = taskTracker.countReduceTasks();
+
if ((maxReduceTasks - currentReduceTasks) >
(maxMapTasks - currentMapTasks)) {
- tasks = reduceScheduler.assignTasks(taskTracker);
+ // get a reduce task first
+ tlr = reduceScheduler.assignTasks(taskTracker);
+ if (TaskLookUpStatus.TASK_FOUND ==
+ tlr.getLookUpStatus()) {
+ // found a task; return
+ return Collections.singletonList(tlr.getTask());
+ }
+ else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS ==
+ tlr.getLookUpStatus()) {
+ // return no task
+ return null;
+ }
// if we didn't get any, look at map tasks, if TT has space
- if ((null == tasks) && (maxMapTasks > currentMapTasks)) {
- tasks = mapScheduler.assignTasks(taskTracker);
+ else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE ==
+ tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+ tlr = mapScheduler.assignTasks(taskTracker);
+ if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+ return Collections.singletonList(tlr.getTask());
+ }
}
}
else {
- tasks = mapScheduler.assignTasks(taskTracker);
- // if we didn't get any, look at red tasks, if TT has space
- if ((null == tasks) && (maxReduceTasks > currentReduceTasks)) {
- tasks = reduceScheduler.assignTasks(taskTracker);
+ // get a map task first
+ tlr = mapScheduler.assignTasks(taskTracker);
+ if (TaskLookUpStatus.TASK_FOUND ==
+ tlr.getLookUpStatus()) {
+ // found a task; return
+ return Collections.singletonList(tlr.getTask());
+ }
+ else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS ==
+ tlr.getLookUpStatus()) {
+ // return no task
+ return null;
+ }
+ // if we didn't get any, look at reduce tasks, if TT has space
+ else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE ==
+ tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+ tlr = reduceScheduler.assignTasks(taskTracker);
+ if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+ return Collections.singletonList(tlr.getTask());
+ }
}
}
- return tasks;
+
+ return null;
}
/**
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=731530&r1=731529&r2=731530&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Mon Jan 5 03:35:22 2009
@@ -1826,6 +1826,77 @@
}
/**
+ * Test HADOOP-4979.
+ * Bug fix for making sure we always return null to TT if there is a
+ * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
+ * or vice-versa.
+ * @throws IOException
+ */
+ public void testHighMemoryBlocking()
+ throws IOException {
+
+ // 2 map and 1 reduce slots
+ taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
+
+ TaskTrackerStatus.ResourceStatus ttStatus =
+ taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+ ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+ ttStatus.setReservedVirtualMemory(0);
+ ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
+ ttStatus.setReservedPhysicalMemory(0);
+ // Normal job on this TT would be 1GB vmem, 0.5GB pmem
+
+ taskTrackerManager.addQueues(new String[] { "default" });
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ // enabled memory-based scheduling
+ scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+ 1 * 1024 * 1024 * 1024L);
+ scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+ 3 * 1024 * 1024 * 1024L);
+ resConf.setDefaultPercentOfPmemInVmem(33.3f);
+ resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // We need a situation where the scheduler needs to run a map task,
+ // but the available one has a high-mem requirement. There should
+ // be another job whose maps or reduces can run, but they shouldn't
+ // be scheduled.
+
+ LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
+ + "2 map tasks");
+ JobConf jConf = new JobConf();
+ jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
+ jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+ LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
+ + "2 map/red tasks");
+ jConf = new JobConf();
+ jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
+ jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // first, a map from j1 will run
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // at this point, the scheduler tries to schedule another map from j1.
+ // there isn't enough space. There is space to run the second job's
+ // map or reduce task, but they shouldn't be scheduled
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ }
+
+ /**
* test invalid highMemoryJobs
* @throws IOException
*/