Author: yhemanth
Date: Thu Jan 15 16:25:36 2009
New Revision: 734869
URL: http://svn.apache.org/viewvc?rev=734869&view=rev
Log:
Merge -r 734867:734868 from trunk to branch 0.20 to fix HADOOP-4988.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=734869&r1=734868&r2=734869&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Jan 15 16:25:36 2009
@@ -539,6 +539,9 @@
HADOOP-4977. Fix a deadlock between the reclaimCapacity and assignTasks
in capacity scheduler. (Vivek Ratan via yhemanth)
+ HADOOP-4988. Fix reclaim capacity to work even when there are queues with
+ no capacity. (Vivek Ratan via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=734869&r1=734868&r2=734869&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Thu Jan 15 16:25:36 2009
@@ -405,26 +405,16 @@
return -1;
}
else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
- // neither needs to reclaim. If either doesn't have a capacity yet,
- // it comes at the end of the queue.
- if ((t1.guaranteedCapacity == 0) &&
- (t2.guaranteedCapacity != 0)) {
- return 1;
- } else if ((t1.guaranteedCapacity != 0) &&
- (t2.guaranteedCapacity == 0)) {
- return -1;
- } else if ((t1.guaranteedCapacity == 0) &&
- (t2.guaranteedCapacity == 0)) {
- // both don't have capacities, treat them as equal.
- return 0;
- } else {
- // look at how much capacity they've filled
- double r1 =
(double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
- double r2 =
(double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
- if (r1<r2) return -1;
- else if (r1>r2) return 1;
- else return 0;
- }
+ // neither needs to reclaim.
+ // look at how much capacity they've filled. Treat a queue with gc=0
+ // equivalent to a queue running at capacity
+ double r1 = (0 == t1.guaranteedCapacity)? 1.0f:
+ (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
+ double r2 = (0 == t2.guaranteedCapacity)? 1.0f:
+ (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
+ if (r1<r2) return -1;
+ else if (r1>r2) return 1;
+ else return 0;
}
else {
// both have to reclaim. Look at which one needs to reclaim earlier
@@ -768,12 +758,10 @@
// collections are up-to-date.
private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws
IOException {
for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
- if (getTSI(qsi).guaranteedCapacity <= 0.0f) {
- // No capacity is guaranteed yet for this queue.
- // Queues are sorted so that ones without capacities
- // come towards the end. Hence, we can simply return
- // from here without considering any further queues.
- return TaskLookupResult.getNoTaskFoundResult();
+ // we may have queues with gc=0. We shouldn't look at jobs from
+ // these queues
+ if (0 == getTSI(qsi).guaranteedCapacity) {
+ continue;
}
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
Modified:
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=734869&r1=734868&r2=734869&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Thu Jan 15 16:25:36 2009
@@ -1415,6 +1415,52 @@
}
+ // test code to reclaim capacity with one queue haveing zero GC
+ // (HADOOP-4988).
+ // Simple test: reclaim capacity should work even if one of the
+ // queues has a gc of 0.
+ public void testReclaimCapacityWithZeroGC() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2", "q3"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ // we want q3 to have 0 GC. Map slots = 4.
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 40.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q3", 10.0f, 1000, true, 25));
+ // note: because of the way we convert gc% into actual gc, q2's gc
+ // will be 1, not 2.
+ resConf.setFakeQueues(queues);
+ resConf.setReclaimCapacityInterval(500);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // set up a situation where q2 is under capacity, and default
+ // is over capacity
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null,
"u1");
+ //FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3",
"u1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ // now submit a job to q2
+ FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2",
"u1");
+ // get scheduler to notice that q2 needs to reclaim
+ scheduler.reclaimCapacity();
+ // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
+ // we start reclaiming when 15 secs are left.
+ clock.advance(400000);
+ scheduler.reclaimCapacity();
+ // no tasks should have been killed yet
+ assertEquals(j1.runningMapTasks, 4);
+ clock.advance(200000);
+ scheduler.reclaimCapacity();
+ // task from j1 will be killed
+ assertEquals(j1.runningMapTasks, 3);
+
+ }
+
/*
* Following is the testing strategy for testing scheduling information.
* - start capacity scheduler with two queues.