MAPREDUCE-6647. MR usage counters use the resources requested instead of the 
resources allocated (haibochen via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3be1ab48
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3be1ab48
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3be1ab48

Branch: refs/heads/HDFS-1312
Commit: 3be1ab485f557c8a3c6a5066453f24d8d61f30be
Parents: 93bacda
Author: Robert Kanter <rkan...@apache.org>
Authored: Wed Apr 6 17:15:43 2016 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Wed Apr 6 17:15:43 2016 -0700

----------------------------------------------------------------------
 .../v2/app/job/impl/TaskAttemptImpl.java        | 41 ++++++++++++--------
 .../apache/hadoop/mapreduce/v2/app/MRApp.java   | 10 ++++-
 .../hadoop/mapreduce/v2/app/TestRecovery.java   | 29 ++++++++------
 .../v2/app/job/impl/TestTaskAttempt.java        | 37 +++++++++++-------
 4 files changed, 74 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be1ab48/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 8fff7de..5f0a622 100755
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1406,29 +1406,36 @@ public abstract class TaskAttemptImpl implements
   
   private static void updateMillisCounters(JobCounterUpdateEvent jce,
       TaskAttemptImpl taskAttempt) {
-    TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+    // if container/resource if not allocated, do not update
+    if (null == taskAttempt.container ||
+        null == taskAttempt.container.getResource()) {
+      return;
+    }
     long duration = (taskAttempt.getFinishTime() - 
taskAttempt.getLaunchTime());
-    int mbRequired =
-        taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
-    int vcoresRequired = taskAttempt.getCpuRequired(taskAttempt.conf, 
taskType);
-
+    Resource allocatedResource = taskAttempt.container.getResource();
+    int mbAllocated = allocatedResource.getMemory();
+    int vcoresAllocated = allocatedResource.getVirtualCores();
     int minSlotMemSize = taskAttempt.conf.getInt(
-      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-
-    int simSlotsRequired =
-        minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) mbRequired
-            / minSlotMemSize);
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    int simSlotsAllocated = minSlotMemSize == 0 ? 0 :
+        (int) Math.ceil((float) mbAllocated / minSlotMemSize);
 
+    TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     if (taskType == TaskType.MAP) {
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, simSlotsRequired * 
duration);
-      jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbRequired);
-      jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, duration * 
vcoresRequired);
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS,
+          simSlotsAllocated * duration);
+      jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbAllocated);
+      jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS,
+          duration * vcoresAllocated);
       jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration);
     } else {
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, simSlotsRequired * 
duration);
-      jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, duration * 
mbRequired);
-      jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, duration * 
vcoresRequired);
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES,
+          simSlotsAllocated * duration);
+      jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES,
+          duration * mbAllocated);
+      jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES,
+          duration * vcoresAllocated);
       jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be1ab48/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index fd0e8e2..b43a7b4 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -111,6 +111,10 @@ import org.junit.Assert;
 public class MRApp extends MRAppMaster {
   private static final Log LOG = LogFactory.getLog(MRApp.class);
 
+  /**
+   * The available resource of each container allocated.
+   */
+  private Resource resource;
   int maps;
   int reduces;
 
@@ -250,6 +254,7 @@ public class MRApp extends MRAppMaster {
     // the job can reaches the final state when MRAppMaster shuts down.
     this.successfullyUnregistered.set(unregistered);
     this.assignedQueue = assignedQueue;
+    this.resource = Resource.newInstance(1234, 2);
   }
 
   @Override
@@ -589,7 +594,6 @@ public class MRApp extends MRAppMaster {
             ContainerId.newContainerId(getContext().getApplicationAttemptId(),
               containerCount++);
         NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
-        Resource resource = Resource.newInstance(1234, 2);
         ContainerTokenIdentifier containerTokenIdentifier =
             new ContainerTokenIdentifier(cId, nodeId.toString(), "user",
             resource, System.currentTimeMillis() + 10000, 42, 42,
@@ -712,6 +716,10 @@ public class MRApp extends MRAppMaster {
     }
   }
 
+  public void setAllocatedContainerResource(Resource resource) {
+    this.resource = resource;
+  }
+
   class TestJob extends JobImpl {
     //override the init transition
     private final TestInitTransition initTransition = new TestInitTransition(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be1ab48/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 504a5f7..0f11238 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -1744,19 +1744,24 @@ public class TestRecovery {
         expectedJobHistoryEvents.remove(0);
       }  else if (current instanceof JobCounterUpdateEvent) {
         JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
-
-        LOG.info("JobCounterUpdateEvent "
-            + jcue.getCounterUpdates().get(0).getCounterKey()
-            + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
-        if (jcue.getCounterUpdates().get(0).getCounterKey() ==
-            JobCounter.NUM_FAILED_MAPS) {
-          totalFailedMaps += jcue.getCounterUpdates().get(0)
-              .getIncrementValue();
-        } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
-            JobCounter.TOTAL_LAUNCHED_MAPS) {
-          totalLaunchedMaps += jcue.getCounterUpdates().get(0)
-              .getIncrementValue();
+        boolean containsUpdates = jcue.getCounterUpdates().size() > 0;
+        // there is no updates in a JobCounterUpdateEvent emitted on
+        // TaskAttempt recovery. Check that first.
+        if(containsUpdates) {
+          LOG.info("JobCounterUpdateEvent "
+              + jcue.getCounterUpdates().get(0).getCounterKey()
+              + " " + jcue.getCounterUpdates().get(0).getIncrementValue());
+          if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+              JobCounter.NUM_FAILED_MAPS) {
+            totalFailedMaps += jcue.getCounterUpdates().get(0)
+                .getIncrementValue();
+          } else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
+              JobCounter.TOTAL_LAUNCHED_MAPS) {
+            totalLaunchedMaps += jcue.getCounterUpdates().get(0)
+                .getIncrementValue();
+          }
         }
+
       } else if (current instanceof JobTaskEvent) {
         JobTaskEvent jte = (JobTaskEvent) current;
         assertEquals(jte.getState(), finalState);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be1ab48/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 5a66061..509f6af 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -33,6 +33,12 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -41,9 +47,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import 
org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -82,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
@@ -250,22 +259,21 @@ public class TestTaskAttempt{
 
   @Test
   public void testMillisCountersUpdate() throws Exception {
-    verifyMillisCounters(2048, 2048, 1024);
-    verifyMillisCounters(2048, 1024, 1024);
-    verifyMillisCounters(10240, 1024, 2048);
+    verifyMillisCounters(Resource.newInstance(1024, 1), 512);
+    verifyMillisCounters(Resource.newInstance(2048, 4), 1024);
+    verifyMillisCounters(Resource.newInstance(10240, 8), 2048);
   }
 
-  public void verifyMillisCounters(int mapMemMb, int reduceMemMb,
+  public void verifyMillisCounters(Resource containerResource,
       int minContainerSize) throws Exception {
     Clock actualClock = SystemClock.getInstance();
     ControlledClock clock = new ControlledClock(actualClock);
     clock.setTime(10);
     MRApp app =
         new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
+    app.setAllocatedContainerResource(containerResource);
     Configuration conf = new Configuration();
-    conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
-    conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
       minContainerSize);
     app.setClusterInfo(new ClusterInfo(Resource.newInstance(10240, 1)));
 
@@ -300,21 +308,24 @@ public class TestTaskAttempt{
     Assert.assertEquals(rta.getFinishTime(), 11);
     Assert.assertEquals(rta.getLaunchTime(), 10);
     Counters counters = job.getAllCounters();
-    Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
+
+    int memoryMb = containerResource.getMemory();
+    int vcores = containerResource.getVirtualCores();
+    Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
         counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
-    Assert.assertEquals((int) Math.ceil((float) reduceMemMb / 
minContainerSize),
+    Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
         counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue());
     Assert.assertEquals(1,
         counters.findCounter(JobCounter.MILLIS_MAPS).getValue());
     Assert.assertEquals(1,
         counters.findCounter(JobCounter.MILLIS_REDUCES).getValue());
-    Assert.assertEquals(mapMemMb,
+    Assert.assertEquals(memoryMb,
         counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue());
-    Assert.assertEquals(reduceMemMb,
+    Assert.assertEquals(memoryMb,
         counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue());
-    Assert.assertEquals(1,
+    Assert.assertEquals(vcores,
         counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue());
-    Assert.assertEquals(1,
+    Assert.assertEquals(vcores,
         counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue());
   }
 

Reply via email to