Author: omalley
Date: Fri Mar 4 03:25:48 2011
New Revision: 1076956
URL: http://svn.apache.org/viewvc?rev=1076956&view=rev
Log:
commit 5f8761f9b4042f0fb1e47846dd93713d8903cd67
Author: Lee Tucker <[email protected]>
Date: Thu Jul 30 17:40:44 2009 -0700
Applying patch 2871273.mr722.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1076956&r1=1076955&r2=1076956&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Mar 4 03:25:48 2011
@@ -529,21 +529,22 @@ class CapacityTaskScheduler extends Task
continue;
}
} else {
- //if memory requirements don't match then we check if the
- //job has either pending or speculative task or has insufficient
number
- //of 'reserved' tasktrackers to cover all pending tasks. If so
- //we reserve the current tasktracker for this job so that
- //high memory jobs are not starved
- if (getPendingTasks(j) != 0 || hasSpeculativeTask(j,
taskTrackerStatus) ||
- !hasSufficientReservedTaskTrackers(j)) {
+ // if memory requirements don't match then we check if the job has
+ // pending tasks and has insufficient number of 'reserved'
+ // tasktrackers to cover all pending tasks. If so we reserve the
+ // current tasktracker for this job so that high memory jobs are not
+ // starved
+ if ((getPendingTasks(j) != 0 &&
!hasSufficientReservedTaskTrackers(j))) {
// Reserve all available slots on this tasktracker
- LOG.info(j.getJobID() + ": Reserving " +
taskTracker.getTrackerName() +
- " since memory-requirements don't match");
- taskTracker.reserveSlots(type, j,
taskTracker.getAvailableSlots(type));
-
+ LOG.info(j.getJobID() + ": Reserving "
+ + taskTracker.getTrackerName()
+ + " since memory-requirements don't match");
+ taskTracker.reserveSlots(type, j, taskTracker
+ .getAvailableSlots(type));
+
// Block
return TaskLookupResult.getMemFailedResult();
- }
+ }
}//end of memory check block
// if we're here, this job has no task to run. Look at the next job.
}//end of for loop
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076956&r1=1076955&r2=1076956&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Mar 4 03:25:48 2011
@@ -2048,30 +2048,29 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
- // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run.
+ // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt2")));
assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt2")));
- // reserved tasktrackers contribute to occupied slots
- // for maps, both tasktrackers are reserved.
- checkOccupiedSlots("default", TaskType.MAP, 1, 7, 175.0f);
- // for reduces, only one tasktracker is reserved, because
- // the reduce scheduler is not visited for tt1 (as it has
- // 0 slots free).
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 5,
- 125.0f);
+
+ // reserved tasktrackers contribute to occupied slots for maps.
+ checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
+ // occupied slots for reduces remain unchanged as tt1 is not reserved for
+ // reduces.
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
LOG.info(job2.getSchedulingInfo());
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 4, 1, 2, 2),
+ 1, 2, 2, 1, 2, 0),
(String) job2.getSchedulingInfo());
assertEquals(String.format(
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
0, 0, 0, 0, 0, 0),
(String) job3.getSchedulingInfo());
+
+ // One reservation is already done for job2. So job3 should go ahead.
+ checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
}
/**
@@ -2574,18 +2573,14 @@ public class TestCapacityScheduler exten
}
/**
- * Test case to test scheduling of jobs with speculative execution
- * in the face of high RAM jobs.
- *
- * Essentially, the test verifies that if a high RAM job has speculative
- * tasks that cannot run because of memory requirements, we block
- * that node and do not return any tasks to it.
- *
+ * Test to verify that TTs are reserved for high memory jobs, but only till a
+ * TT is reserved for each of the pending task.
* @throws IOException
*/
- public void testHighRamJobWithSpeculativeExecution() throws IOException {
- // 2 TTs, 3 map and 3 reduce slots on each TT
- taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
+ public void testTTReservingWithHighMemoryJobs()
+ throws IOException {
+ // 3 taskTrackers, 2 map and 0 reduce slots on each TT
+ taskTrackerManager = new FakeTaskTrackerManager(3, 2, 0);
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2593,96 +2588,105 @@ public class TestCapacityScheduler exten
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
- // 1GB for each map, 1GB for each reduce
+ // Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
- 3 * 1024L);
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
- 3 * 1024L);
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- // Submit a normal job that should occupy a node
+ LOG.debug("Submit a regular memory(1GB vmem maps/reduces) job of "
+ + "3 map/red tasks");
JobConf jConf = new JobConf(conf);
+ jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(0);
- jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(0);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(3);
+ jConf.setNumReduceTasks(3);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job1 = submitJob(JobStatus.PREP, jConf);
-
- //Submit a high memory job with speculative tasks.
- jConf = new JobConf();
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // assign one map task of job1 on all the TTs
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment("tt3", "attempt_test_0001_m_000003_0 on tt3");
+ scheduler.updateQSIInfoForTests();
+
+ LOG.info(job1.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 3, 3, 0, 0,
+ 0, 0), (String) job1.getSchedulingInfo());
+
+ LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+ + "2 map tasks");
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(0);
- jConf.setNumMapTasks(1);
+ jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
- jConf.setMapSpeculativeExecution(true);
- jConf.setReduceSpeculativeExecution(false);
- FakeJobInProgress job2 =
- new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
- taskTrackerManager, "u1");
- taskTrackerManager.submitJob(job2);
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- //Submit normal job
+ LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+ + "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(0);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(0);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
- jConf.setMapSpeculativeExecution(false);
- jConf.setReduceSpeculativeExecution(false);
- FakeJobInProgress job3 = submitJob(JobStatus.PREP, jConf);
-
- controlledInitializationPoller.selectJobsToInitialize();
- raiseStatusChangeEvents(scheduler.jobQueuesManager);
+ FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
- // Have one node on which all tasks of job1 are scheduled.
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ // Job2, a high memory job cannot be accommodated on a any TT. But with
each
+ // trip to the scheduler, each of the TT should be reserved by job2.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ scheduler.updateQSIInfoForTests();
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 2, 0,
+ 0, 0), (String) job2.getSchedulingInfo());
- // raise events to initialize the 3rd job
- controlledInitializationPoller.selectJobsToInitialize();
- raiseStatusChangeEvents(scheduler.jobQueuesManager);
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ scheduler.updateQSIInfoForTests();
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+ 0, 0), (String) job2.getSchedulingInfo());
- // On the second node, one task of the high RAM job can be scheduled.
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
- checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
- assertEquals("pending maps greater than zero " , job2.pendingMaps(), 0);
- // Total 4 map slots should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
-
- // now when the first node gets back, it cannot run any task
- // because job2 has a speculative task that can run on this node.
- // This is even though job3's tasks can run on this node.
+ // Job2 has only 2 pending tasks. So no more reservations. Job3 should get
+ // slots on tt3. tt1 and tt2 should not be assigned any slots with the
+ // reservation stats intact.
assertNull(scheduler.assignTasks(tracker("tt1")));
- // Reservation will count for 2 more slots.
- checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
-
- // finish one task from tt1.
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0",
- job1);
-
- // now, we can schedule the speculative task on tt1
- checkAssignment("tt1", "attempt_test_0002_m_000001_1 on tt1");
-
- // finish one more task from tt1.
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0",
- job1);
-
- // now the new job's tasks can be scheduled.
- checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
- }
+ scheduler.updateQSIInfoForTests();
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+ 0, 0), (String) job2.getSchedulingInfo());
+
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ scheduler.updateQSIInfoForTests();
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+ 0, 0), (String) job2.getSchedulingInfo());
+
+ checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+ scheduler.updateQSIInfoForTests();
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(String.format(
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+ 0, 0), (String) job2.getSchedulingInfo());
+
+ // No more tasks there in job3 also
+ assertNull(scheduler.assignTasks(tracker("tt3")));
+}
/**
* Test to verify that queue ordering is based on the number of slots
occupied