Author: tgraves
Date: Tue Aug 28 21:28:39 2012
New Revision: 1378357

URL: http://svn.apache.org/viewvc?rev=1378357&view=rev
Log:
MAPREDUCE-1684. ClusterStatus can be cached in 
CapacityTaskScheduler.assignTasks() (Koji Noguchi via tgraves)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1378357&r1=1378356&r2=1378357&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Aug 28 21:28:39 2012
@@ -213,6 +213,9 @@ Release 1.2.0 - unreleased
     MAPREDUCE-4595. TestLostTracker failing - possibly due to a race in 
     JobHistory.JobHistoryFilesManager#run() (kkambatl via tucu)
 
+    MAPREDUCE-1684. ClusterStatus can be cached in 
+    CapacityTaskScheduler.assignTasks() (Koji Noguchi via tgraves)
+
 Release 1.1.0 - unreleased
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1378357&r1=1378356&r2=1378357&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
 Tue Aug 28 21:28:39 2012
@@ -154,7 +154,8 @@ class CapacityTaskScheduler extends Task
     protected TaskType type = null;
 
     abstract TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
-        JobInProgress job, boolean assignOffSwitch) throws IOException;
+        JobInProgress job, boolean assignOffSwitch,
+        ClusterStatus clusterStatus) throws IOException;
 
     int getSlotsOccupied(JobInProgress job) {
       return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * 
@@ -293,7 +294,8 @@ class CapacityTaskScheduler extends Task
     private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
                                               int availableSlots,
                                               CapacitySchedulerQueue queue,
-                                              boolean assignOffSwitch)
+                                              boolean assignOffSwitch,
+                                              ClusterStatus clusterStatus)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
@@ -320,7 +322,8 @@ class CapacityTaskScheduler extends Task
                                                               availableSlots)) 
{
           // We found a suitable job. Get task from it.
           TaskLookupResult tlr = 
-            obtainNewTask(taskTrackerStatus, j, assignOffSwitch);
+            obtainNewTask(taskTrackerStatus, j, assignOffSwitch,
+                          clusterStatus);
           //if there is a task return it immediately.
           if (tlr.getLookUpStatus() == 
                   TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND || 
@@ -379,6 +382,11 @@ class CapacityTaskScheduler extends Task
 
       printQueues();
 
+      //MAPREDUCE-1684: somehow getClusterStatus seems to be expensive. Caching
+      //here to reuse during the scheduling
+      ClusterStatus clusterStatus =
+        scheduler.taskTrackerManager.getClusterStatus();
+
       // Check if this tasktracker has been reserved for a job...
       JobInProgress job = taskTracker.getJobForFallowSlot(type);
       if (job != null) {
@@ -397,7 +405,7 @@ class CapacityTaskScheduler extends Task
             // Don't care about locality!
             job.overrideSchedulingOpportunities();
           }
-          return obtainNewTask(taskTrackerStatus, job, true);
+          return obtainNewTask(taskTrackerStatus, job, true, clusterStatus);
         } else {
           // Re-reserve the current tasktracker
           taskTracker.reserveSlots(type, job, availableSlots);
@@ -420,7 +428,8 @@ class CapacityTaskScheduler extends Task
         }
         
         TaskLookupResult tlr = 
-          getTaskFromQueue(taskTracker, availableSlots, queue, 
assignOffSwitch);
+          getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch,
+                          clusterStatus);
         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
 
         if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
@@ -501,10 +510,10 @@ class CapacityTaskScheduler extends Task
 
     @Override
     TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
-                                   JobInProgress job, boolean assignOffSwitch) 
+                                   JobInProgress job, boolean assignOffSwitch,
+                                   ClusterStatus clusterStatus)
     throws IOException {
-      ClusterStatus clusterStatus = 
-        scheduler.taskTrackerManager.getClusterStatus();
+
       int numTaskTrackers = clusterStatus.getTaskTrackers();
       int numUniqueHosts = 
scheduler.taskTrackerManager.getNumberOfUniqueHosts();
       
@@ -581,10 +590,9 @@ class CapacityTaskScheduler extends Task
 
     @Override
     TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
-                                   JobInProgress job, boolean unused) 
+                                   JobInProgress job, boolean unused,
+                                   ClusterStatus clusterStatus)
     throws IOException {
-      ClusterStatus clusterStatus = 
-        scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
       Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());


Reply via email to