Author: mattf
Date: Mon May  7 06:27:06 2012
New Revision: 1334880

URL: http://svn.apache.org/viewvc?rev=1334880&view=rev
Log:
MAPREDUCE-1238 merged to branch-1.0 for 1.0.3

Modified:
    hadoop/common/branches/branch-1.0/CHANGES.txt   (contents, props changed)
    
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1334880&r1=1334879&r2=1334880&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Mon May  7 06:27:06 2012
@@ -80,6 +80,9 @@ Release 1.0.3 - unreleased
     MAPREDUCE-3857. Grep example ignores mapred.job.queue.name.
     (Jonathan Eagles via mattf)
 
+    MAPREDUCE-1238. mapred metrics shows negative count of waiting maps and
+    reduces (tgraves via bobby)
+
 Release 1.0.2 - 2012.03.24
 
   NEW FEATURES

Propchange: hadoop/common/branches/branch-1.0/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1311966

Modified: 
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1334880&r1=1334879&r2=1334880&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
 (original)
+++ 
hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
 Mon May  7 06:27:06 2012
@@ -732,7 +732,7 @@ public class JobInProgress {
     if (numMapTasks > 0) { 
       nonRunningMapCache = createCache(splits, maxLevel);
     }
-        
+
     // set the launch time
     this.launchTime = jobtracker.getClock().getTime();
 
@@ -789,12 +789,15 @@ public class JobInProgress {
     
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
+
+      // set this before the throw to make sure cleanup works properly
+      tasksInited = true;
+
       if(jobInitKillStatus.killed) {
         throw new KillInterruptedException("Job " + jobId + " killed in init");
       }
     }
     
-    tasksInited = true;
     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
                                  numMapTasks, numReduceTasks);
     
@@ -3204,11 +3207,16 @@ public class JobInProgress {
       // Cancel task tracker reservation
       cancelReservedSlots();
 
+      //  Waiting metrics are incremented in JobInProgress.initTasks()
+      //  If a job gets an exception before that, we do not want to
+      //  incorrectly decrement.
+      if (tasksInited) {
+        jobtracker.getInstrumentation().decWaitingMaps(getJobID(), 
pendingMaps());
+        jobtracker.getInstrumentation().decWaitingReduces(getJobID(), 
pendingReduces());
+        this.queueMetrics.decWaitingMaps(getJobID(), pendingMaps());
+        this.queueMetrics.decWaitingReduces(getJobID(), pendingReduces());
+      }
       // Let the JobTracker know that a job is complete
-      jobtracker.getInstrumentation().decWaitingMaps(getJobID(), 
pendingMaps());
-      jobtracker.getInstrumentation().decWaitingReduces(getJobID(), 
pendingReduces());
-      this.queueMetrics.decWaitingMaps(getJobID(), pendingMaps());
-      this.queueMetrics.decWaitingReduces(getJobID(), pendingReduces());
       jobtracker.storeCompletedJob(this);
       jobtracker.finalizeJob(this);
 


Reply via email to