Author: yhemanth
Date: Wed Jan 14 18:55:57 2009
New Revision: 734597
URL: http://svn.apache.org/viewvc?rev=734597&view=rev
Log:
HADOOP-4977. Fix a deadlock between the reclaimCapacity and assignTasks in
capacity scheduler. Contributed by Vivek Ratan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=734597&r1=734596&r2=734597&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jan 14 18:55:57 2009
@@ -585,6 +585,9 @@
HADOOP-5026. Make chukwa/bin scripts executable in repository. (Andy
Konwinski via cdouglas)
+ HADOOP-4977. Fix a deadlock between the reclaimCapacity and assignTasks
+ in capacity scheduler. (Vivek Ratan via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=734597&r1=734596&r2=734597&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Wed Jan 14 18:55:57 2009
@@ -160,6 +160,17 @@
* created.
*/
int numReclaimedResources = 0;
+
+ /**
+ * reset the variables associated with tasks
+ */
+ void resetTaskVars() {
+ numRunningTasks = 0;
+ numPendingTasks = 0;
+ for (String s: numRunningTasksByUser.keySet()) {
+ numRunningTasksByUser.put(s, 0);
+ }
+ }
/**
* return information about the tasks
@@ -350,8 +361,6 @@
*/
private static abstract class TaskSchedulingMgr {
- /** we keep track of the number of map or reduce slots we saw last */
- private int prevClusterCapacity = 0;
/** our TaskScheduler object */
protected CapacityTaskScheduler scheduler;
// can be replaced with a global type, if we have one
@@ -362,8 +371,6 @@
abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
JobInProgress job) throws IOException;
- abstract int getClusterCapacity();
- abstract int getRunningTasks(JobInProgress job);
abstract int getPendingTasks(JobInProgress job);
abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
@@ -464,14 +471,15 @@
* waiting)
* b. Check if a queue hasn't received enough of the resources it needed
* to be reclaimed and thus tasks need to be killed.
+ * The caller is responsible for ensuring that the QSI objects and the
+ * collections are up-to-date.
+ *
+ * Make sure that we do not make any calls to scheduler.taskTrackerManager
+ * as this can result in a deadlock (see HADOOP-4977).
*/
- private synchronized void reclaimCapacity() {
+ private synchronized void reclaimCapacity(int nextHeartbeatInterval) {
int tasksToKill = 0;
- // make sure we always get the latest values
- updateQSIObjects();
- updateCollectionOfQSIs();
-
QueueSchedulingInfo lastQsi =
qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
TaskSchedulingInfo lastTsi = getTSI(lastQsi);
@@ -513,11 +521,11 @@
// create a request for resources to be reclaimed
int amt = Math.min((tsi.guaranteedCapacity-usedCap),
(tsi.numPendingTasks - tsi.numReclaimedResources));
- // create a rsource object that needs to be reclaimed some time
+ // create a resource object that needs to be reclaimed some time
// in the future
long whenToKill = qsi.reclaimTime -
(CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING *
- scheduler.taskTrackerManager.getNextHeartbeatInterval());
+ nextHeartbeatInterval);
if (whenToKill < 0) whenToKill = 0;
tsi.reclaimList.add(new ReclaimedResource(amt,
currentTime + qsi.reclaimTime,
@@ -617,88 +625,6 @@
return tID;
}
-
- /**
- * Update individual QSI objects.
- * We don't need exact information for all variables, just enough for us
- * to make scheduling decisions. For example, we don't need an exact count
- * of numRunningTasks. Once we count upto the grid capacity (gcSum), any
- * number beyond that will make no difference.
- *
- * The pending task count is only required in reclaim capacity. So
- * if the computation becomes expensive, we can add a boolean to
- * denote if pending task computation is required or not.
- * */
- private synchronized void updateQSIObjects() {
- // if # of slots have changed since last time, update.
- // First, compute whether the total number of TT slots have changed
- int currentClusterCapacity = getClusterCapacity();
- for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
- TaskSchedulingInfo tsi = getTSI(qsi);
- // compute new GCs and ACs, if TT slots have changed
- if (currentClusterCapacity != prevClusterCapacity) {
- tsi.guaranteedCapacity =
- (int)(qsi.guaranteedCapacityPercent*currentClusterCapacity/100);
- }
- tsi.numRunningTasks = 0;
- tsi.numPendingTasks = 0;
- for (String s: tsi.numRunningTasksByUser.keySet()) {
- tsi.numRunningTasksByUser.put(s, 0);
- }
- // update stats on running jobs
- for (JobInProgress j:
- scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
- if (j.getStatus().getRunState() != JobStatus.RUNNING) {
- continue;
- }
- tsi.numRunningTasks += getRunningTasks(j);
- Integer i = tsi.numRunningTasksByUser.get(j.getProfile().getUser());
- tsi.numRunningTasksByUser.put(j.getProfile().getUser(),
- i+getRunningTasks(j));
- tsi.numPendingTasks += getPendingTasks(j);
- LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) =
" +
- j.runningMaps() + ", run(r) = " + j.runningReduces() +
- ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
- j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
- ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
- j.speculativeMapTasks + ", spec(r) = " +
j.speculativeReduceTasks
- + ", total(m) = " + j.numMapTasks + ", total(r) = " +
- j.numReduceTasks);
- /*
- * it's fine walking down the entire list of running jobs - there
- * probably will not be many, plus, we may need to go through the
- * list to compute numRunningTasksByUser. If this is expensive, we
- * can keep a list of running jobs per user. Then we only need to
- * consider the first few jobs per user.
- */
- }
-
- //update stats on waiting jobs
- for(JobInProgress j :
- scheduler.jobQueuesManager.getJobs(qsi.queueName)) {
- // pending tasks
- if(tsi.numPendingTasks > currentClusterCapacity) {
- // that's plenty. no need for more computation
- break;
- }
- /*
- * Consider only the waiting jobs in the job queue. Job queue can
- * contain:
- * 1. Jobs which are in running state but not scheduled
- * (these would also be present in running queue), the pending
- * task count of these jobs is computed when scheduler walks
- * through running job queue.
- * 2. Jobs which are killed by user, but waiting job initialization
- * poller to walk through the job queue to clean up killed jobs.
- */
- if (j.getStatus().getRunState() == JobStatus.PREP) {
- tsi.numPendingTasks += getPendingTasks(j);
- }
- }
- }
- prevClusterCapacity = currentClusterCapacity;
- }
-
// called when a task is allocated to queue represented by qsi.
// update our info about reclaimed resources
private synchronized void updateReclaimedResources(QueueSchedulingInfo
qsi) {
@@ -838,22 +764,9 @@
}
// Always return a TaskLookupResult object. Don't return null.
+ // The caller is responsible for ensuring that the QSI objects and the
+ // collections are up-to-date.
private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws
IOException {
- /*
- * update all our QSI objects.
- * This involves updating each qsi structure. This operation depends
- * on the number of running jobs in a queue, and some waiting jobs. If it
- * becomes expensive, do it once every few heartbeats only.
- */
- updateQSIObjects();
- LOG.debug("After updating QSI objects in " + this.type + " scheduler :");
- printQSIs();
- /*
- * sort list of queues first, as we want queues that need the most to
- * get first access. If this is expensive, sort every few heartbeats.
- * We're only sorting a collection of queues - there shouldn't be many.
- */
- updateCollectionOfQSIs();
for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
if (getTSI(qsi).guaranteedCapacity <= 0.0f) {
// No capacity is guaranteed yet for this queue.
@@ -886,6 +799,7 @@
return TaskLookupResult.getNoTaskFoundResult();
}
+ // for debugging.
private void printQSIs() {
StringBuffer s = new StringBuffer();
for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
@@ -1044,6 +958,10 @@
MemoryMatcher memoryMatcher = new MemoryMatcher(this);
+ /** we keep track of the number of map/reduce slots we saw last */
+ private int prevMapClusterCapacity = 0;
+ private int prevReduceClusterCapacity = 0;
+
/** name of the default queue. */
static final String DEFAULT_QUEUE_NAME = "default";
@@ -1269,22 +1187,135 @@
super.setConf(conf);
}
+ /**
+ * Reclaim capacity for both map & reduce tasks.
+ * Do not make this synchronized, since we call taskTrackerManager
+ * (see HADOOP-4977).
+ */
void reclaimCapacity() {
- mapScheduler.reclaimCapacity();
- reduceScheduler.reclaimCapacity();
+ // get the cluster capacity
+ ClusterStatus c = taskTrackerManager.getClusterStatus();
+ int mapClusterCapacity = c.getMaxMapTasks();
+ int reduceClusterCapacity = c.getMaxReduceTasks();
+ int nextHeartbeatInterval = taskTrackerManager.getNextHeartbeatInterval();
+ // update the QSI objects
+ updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+ // update the qsi collections, since we depend on their ordering
+ mapScheduler.updateCollectionOfQSIs();
+ reduceScheduler.updateCollectionOfQSIs();
+ // now, reclaim
+ mapScheduler.reclaimCapacity(nextHeartbeatInterval);
+ reduceScheduler.reclaimCapacity(nextHeartbeatInterval);
}
/**
* provided for the test classes
- * lets you update the QSI objects and sorted collection
+ * lets you update the QSI objects and sorted collections
*/
- void updateQSIInfo() {
- mapScheduler.updateQSIObjects();
+ void updateQSIInfoForTests() {
+ ClusterStatus c = taskTrackerManager.getClusterStatus();
+ int mapClusterCapacity = c.getMaxMapTasks();
+ int reduceClusterCapacity = c.getMaxReduceTasks();
+ // update the QSI objects
+ updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
mapScheduler.updateCollectionOfQSIs();
- reduceScheduler.updateQSIObjects();
reduceScheduler.updateCollectionOfQSIs();
}
-
+
+ /**
+ * Update individual QSI objects.
+ * We don't need exact information for all variables, just enough for us
+ * to make scheduling decisions. For example, we don't need an exact count
+ * of numRunningTasks. Once we count upto the grid capacity, any
+ * number beyond that will make no difference.
+ *
+ * The pending task count is only required in reclaim capacity. So
+ * if the computation becomes expensive, we can add a boolean to
+ * denote if pending task computation is required or not.
+ *
+ **/
+ private synchronized void updateQSIObjects(int mapClusterCapacity,
+ int reduceClusterCapacity) {
+ // if # of slots have changed since last time, update.
+ // First, compute whether the total number of TT slots have changed
+ for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+ // compute new GCs, if TT slots have changed
+ if (mapClusterCapacity != prevMapClusterCapacity) {
+ qsi.mapTSI.guaranteedCapacity =
+ (int)(qsi.guaranteedCapacityPercent*mapClusterCapacity/100);
+ }
+ if (reduceClusterCapacity != prevReduceClusterCapacity) {
+ qsi.reduceTSI.guaranteedCapacity =
+ (int)(qsi.guaranteedCapacityPercent*reduceClusterCapacity/100);
+ }
+ // reset running/pending tasks, tasks per user
+ qsi.mapTSI.resetTaskVars();
+ qsi.reduceTSI.resetTaskVars();
+ // update stats on running jobs
+ for (JobInProgress j:
+ jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+ if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+ continue;
+ }
+ int runningMaps = j.runningMaps();
+ int runningReduces = j.runningReduces();
+ qsi.mapTSI.numRunningTasks += runningMaps;
+ qsi.reduceTSI.numRunningTasks += runningReduces;
+ Integer i =
+ qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
+ qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
+ i+runningMaps);
+ i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
+ qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
+ i+runningReduces);
+ qsi.mapTSI.numPendingTasks += j.pendingMaps();
+ qsi.reduceTSI.numPendingTasks += j.pendingReduces();
+ LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
+ j.runningMaps() + ", run(r) = " + j.runningReduces() +
+ ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
+ j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
+ ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
+ j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
+ + ", total(m) = " + j.numMapTasks + ", total(r) = " +
+ j.numReduceTasks);
+ /*
+ * it's fine walking down the entire list of running jobs - there
+ * probably will not be many, plus, we may need to go through the
+ * list to compute numRunningTasksByUser. If this is expensive, we
+ * can keep a list of running jobs per user. Then we only need to
+ * consider the first few jobs per user.
+ */
+ }
+
+ //update stats on waiting jobs
+ for(JobInProgress j: jobQueuesManager.getJobs(qsi.queueName)) {
+ // pending tasks
+ if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
+ (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
+ // that's plenty. no need for more computation
+ break;
+ }
+ /*
+ * Consider only the waiting jobs in the job queue. Job queue can
+ * contain:
+ * 1. Jobs which are in running state but not scheduled
+ * (these would also be present in running queue), the pending
+ * task count of these jobs is computed when scheduler walks
+ * through running job queue.
+ * 2. Jobs which are killed by user, but waiting job initialization
+ * poller to walk through the job queue to clean up killed jobs.
+ */
+ if (j.getStatus().getRunState() == JobStatus.PREP) {
+ qsi.mapTSI.numPendingTasks += j.pendingMaps();
+ qsi.reduceTSI.numPendingTasks += j.pendingReduces();
+ }
+ }
+ }
+
+ prevMapClusterCapacity = mapClusterCapacity;
+ prevReduceClusterCapacity = reduceClusterCapacity;
+ }
+
/*
* The grand plan for assigning a task.
* First, decide whether a Map or Reduce task should be given to a TT
@@ -1309,20 +1340,34 @@
* Number of ways to do this. For now, base decision on how much is needed
* versus how much is used (default to Map, if equal).
*/
- LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() +
- ", run maps=" + taskTracker.countMapTasks() + ", max reds=" +
- taskTracker.getMaxReduceTasks() + ", run reds=" +
- taskTracker.countReduceTasks() + ", map cap=" +
- mapScheduler.getClusterCapacity() + ", red cap = " +
- reduceScheduler.getClusterCapacity());
+ ClusterStatus c = taskTrackerManager.getClusterStatus();
+ int mapClusterCapacity = c.getMaxMapTasks();
+ int reduceClusterCapacity = c.getMaxReduceTasks();
int maxMapTasks = taskTracker.getMaxMapTasks();
int currentMapTasks = taskTracker.countMapTasks();
int maxReduceTasks = taskTracker.getMaxReduceTasks();
int currentReduceTasks = taskTracker.countReduceTasks();
+ LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() +
+ ", run maps=" + taskTracker.countMapTasks() + ", max reds=" +
+ taskTracker.getMaxReduceTasks() + ", run reds=" +
+ taskTracker.countReduceTasks() + ", map cap=" +
+ mapClusterCapacity + ", red cap = " +
+ reduceClusterCapacity);
+
+ /*
+ * update all our QSI objects.
+ * This involves updating each qsi structure. This operation depends
+ * on the number of running jobs in a queue, and some waiting jobs. If it
+ * becomes expensive, do it once every few heartbeats only.
+ */
+ updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+ // make sure we get our map or reduce scheduling object to update its
+ // collection of QSI objects too.
if ((maxReduceTasks - currentReduceTasks) >
(maxMapTasks - currentMapTasks)) {
// get a reduce task first
+ reduceScheduler.updateCollectionOfQSIs();
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
@@ -1337,6 +1382,7 @@
// if we didn't get any, look at map tasks, if TT has space
else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+ mapScheduler.updateCollectionOfQSIs();
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
@@ -1346,6 +1392,7 @@
}
else {
// get a map task first
+ mapScheduler.updateCollectionOfQSIs();
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
@@ -1359,6 +1406,7 @@
// if we didn't get any, look at reduce tasks, if TT has space
else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+ reduceScheduler.updateCollectionOfQSIs();
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
tlr.getLookUpStatus()) {
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=734597&r1=734596&r2=734597&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Wed Jan 14 18:55:57 2009
@@ -1525,7 +1525,7 @@
//Get scheduling information, now the number of waiting job should have
//changed to 4 as one is scheduled and has become running.
// make sure we update our stats
- scheduler.updateQSIInfo();
+ scheduler.updateQSIInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1537,7 +1537,7 @@
//assign a reduce task
Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
// make sure we update our stats
- scheduler.updateQSIInfo();
+ scheduler.updateQSIInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1553,7 +1553,7 @@
taskTrackerManager.finalizeJob(u1j1);
// make sure we update our stats
- scheduler.updateQSIInfo();
+ scheduler.updateQSIInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1570,7 +1570,7 @@
//Run initializer to clean up failed jobs
p.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfo();
+ scheduler.updateQSIInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1588,7 +1588,7 @@
//run initializer to clean up failed job
p.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfo();
+ scheduler.updateQSIInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1613,7 +1613,7 @@
//one. run the poller as it is responsible for waiting count
p.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfo();
+ scheduler.updateQSIInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1625,7 +1625,7 @@
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
// make sure we update our stats
- scheduler.updateQSIInfo();
+ scheduler.updateQSIInfoForTests();
//Now running counts should become zero
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();