Author: tgraves Date: Tue Aug 28 21:28:39 2012 New Revision: 1378357 URL: http://svn.apache.org/viewvc?rev=1378357&view=rev Log: MAPREDUCE-1684. ClusterStatus can be cached in CapacityTaskScheduler.assignTasks() (Koji Noguchi via tgraves)
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1378357&r1=1378356&r2=1378357&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Tue Aug 28 21:28:39 2012 @@ -213,6 +213,9 @@ Release 1.2.0 - unreleased MAPREDUCE-4595. TestLostTracker failing - possibly due to a race in JobHistory.JobHistoryFilesManager#run() (kkambatl via tucu) + MAPREDUCE-1684. ClusterStatus can be cached in + CapacityTaskScheduler.assignTasks() (Koji Noguchi via tgraves) + Release 1.1.0 - unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1378357&r1=1378356&r2=1378357&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Aug 28 21:28:39 2012 @@ -154,7 +154,8 @@ class CapacityTaskScheduler extends Task protected TaskType type = null; abstract TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, - JobInProgress job, boolean assignOffSwitch) throws IOException; + JobInProgress job, boolean assignOffSwitch, + ClusterStatus clusterStatus) throws IOException; int getSlotsOccupied(JobInProgress job) { return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * @@ -293,7 +294,8 @@ class CapacityTaskScheduler extends Task private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker, int availableSlots, CapacitySchedulerQueue queue, - boolean assignOffSwitch) + boolean assignOffSwitch, + ClusterStatus clusterStatus) throws IOException { TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); // we only look at jobs in the running queues, as these are the ones @@ -320,7 +322,8 @@ class CapacityTaskScheduler extends Task availableSlots)) { // We found a suitable job. Get task from it. TaskLookupResult tlr = - obtainNewTask(taskTrackerStatus, j, assignOffSwitch); + obtainNewTask(taskTrackerStatus, j, assignOffSwitch, + clusterStatus); //if there is a task return it immediately. if (tlr.getLookUpStatus() == TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND || @@ -379,6 +382,11 @@ class CapacityTaskScheduler extends Task printQueues(); + //MAPREDUCE-1684: somehow getClusterStatus seems to be expensive. Caching + //here to reuse during the scheduling + ClusterStatus clusterStatus = + scheduler.taskTrackerManager.getClusterStatus(); + // Check if this tasktracker has been reserved for a job... JobInProgress job = taskTracker.getJobForFallowSlot(type); if (job != null) { @@ -397,7 +405,7 @@ class CapacityTaskScheduler extends Task // Don't care about locality! job.overrideSchedulingOpportunities(); } - return obtainNewTask(taskTrackerStatus, job, true); + return obtainNewTask(taskTrackerStatus, job, true, clusterStatus); } else { // Re-reserve the current tasktracker taskTracker.reserveSlots(type, job, availableSlots); @@ -420,7 +428,8 @@ class CapacityTaskScheduler extends Task } TaskLookupResult tlr = - getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch); + getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch, + clusterStatus); TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus(); if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) { @@ -501,10 +510,10 @@ class CapacityTaskScheduler extends Task @Override TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, - JobInProgress job, boolean assignOffSwitch) + JobInProgress job, boolean assignOffSwitch, + ClusterStatus clusterStatus) throws IOException { - ClusterStatus clusterStatus = - scheduler.taskTrackerManager.getClusterStatus(); + int numTaskTrackers = clusterStatus.getTaskTrackers(); int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts(); @@ -581,10 +590,9 @@ class CapacityTaskScheduler extends Task @Override TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, - JobInProgress job, boolean unused) + JobInProgress job, boolean unused, + ClusterStatus clusterStatus) throws IOException { - ClusterStatus clusterStatus = - scheduler.taskTrackerManager.getClusterStatus(); int numTaskTrackers = clusterStatus.getTaskTrackers(); Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, scheduler.taskTrackerManager.getNumberOfUniqueHosts());