MAPREDUCE-6771. RMContainerAllocator sends container diagnostics event after 
corresponding completion event. Contributed by Haibo Chen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a1b8251b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a1b8251b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a1b8251b

Branch: refs/heads/HDFS-10467
Commit: a1b8251bf7a7e9b776c4483fa01f7d453420eba4
Parents: 2ae5a3a
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 29 15:27:17 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 29 15:27:17 2016 +0000

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         | 51 ++++++++++++--------
 .../v2/app/rm/TestRMContainerAllocator.java     | 46 ++++++++++++++++++
 2 files changed, 77 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1b8251b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index ecd75db..4cb3cbe 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -150,7 +150,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     new LinkedList<ContainerRequest>();
 
   //holds information about the assigned containers to task attempts
-  private final AssignedRequests assignedRequests = new AssignedRequests();
+  private final AssignedRequests assignedRequests;
   
   //holds scheduled requests to be fulfilled by RM
   private final ScheduledRequests scheduledRequests = new ScheduledRequests();
@@ -200,6 +200,11 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     this.preemptionPolicy = preemptionPolicy;
     this.stopped = new AtomicBoolean(false);
     this.clock = context.getClock();
+    this.assignedRequests = createAssignedRequests();
+  }
+
+  protected AssignedRequests createAssignedRequests() {
+    return new AssignedRequests();
   }
 
   @Override
@@ -833,29 +838,35 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     }
 
     for (ContainerStatus cont : finishedContainers) {
-      LOG.info("Received completed container " + cont.getContainerId());
-      TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
-      if (attemptID == null) {
-        LOG.error("Container complete event for unknown container id "
-            + cont.getContainerId());
-      } else {
-        pendingRelease.remove(cont.getContainerId());
-        assignedRequests.remove(attemptID);
-        
-        // send the container completed event to Task attempt
-        eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
-        
-        // Send the diagnostics
-        String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
-        eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
-            diagnostics));
-
-        preemptionPolicy.handleCompletedContainer(attemptID);
-      }
+      processFinishedContainer(cont);
     }
     return newContainers;
   }
 
+  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  void processFinishedContainer(ContainerStatus container) {
+    LOG.info("Received completed container " + container.getContainerId());
+    TaskAttemptId attemptID = assignedRequests.get(container.getContainerId());
+    if (attemptID == null) {
+      LOG.error("Container complete event for unknown container "
+          + container.getContainerId());
+    } else {
+      pendingRelease.remove(container.getContainerId());
+      assignedRequests.remove(attemptID);
+
+      // Send the diagnostics
+      String diagnostic = 
StringInterner.weakIntern(container.getDiagnostics());
+      eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
+          diagnostic));
+
+      // send the container completed event to Task attempt
+      eventHandler.handle(createContainerFinishedEvent(container, attemptID));
+
+      preemptionPolicy.handleCompletedContainer(attemptID);
+    }
+  }
+
   private void applyConcurrentTaskLimits() {
     int numScheduledMaps = scheduledRequests.maps.size();
     if (maxRunningMaps > 0 && numScheduledMaps > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1b8251b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index a115b13..38a9731 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -70,11 +71,13 @@ import 
org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -144,6 +147,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Supplier;
+import org.mockito.InOrder;
 
 @SuppressWarnings("unchecked")
 public class TestRMContainerAllocator {
@@ -3017,6 +3021,48 @@ public class TestRMContainerAllocator {
     }
   }
 
+  /**
+   * MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the
+   * right order while processing finished containers.
+   */
+  @Test
+  public void testHandlingFinishedContainers() {
+    EventHandler eventHandler = mock(EventHandler.class);
+
+    AppContext context = mock(RunningAppContext.class);
+    when(context.getClock()).thenReturn(new ControlledClock());
+    when(context.getClusterInfo()).thenReturn(
+        new ClusterInfo(Resource.newInstance(10240, 1)));
+    when(context.getEventHandler()).thenReturn(eventHandler);
+    RMContainerAllocator containerAllocator =
+        new RMContainerAllocatorForFinishedContainer(null, context,
+            mock(AMPreemptionPolicy.class));
+
+    ContainerStatus finishedContainer = ContainerStatus.newInstance(
+        mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
+    containerAllocator.processFinishedContainer(finishedContainer);
+
+    InOrder inOrder = inOrder(eventHandler);
+    inOrder.verify(eventHandler).handle(
+        isA(TaskAttemptDiagnosticsUpdateEvent.class));
+    inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class));
+    inOrder.verifyNoMoreInteractions();
+  }
+
+  private static class RMContainerAllocatorForFinishedContainer
+      extends RMContainerAllocator {
+    public RMContainerAllocatorForFinishedContainer(ClientService 
clientService,
+        AppContext context, AMPreemptionPolicy preemptionPolicy) {
+      super(clientService, context, preemptionPolicy);
+    }
+    @Override
+    protected AssignedRequests createAssignedRequests() {
+      AssignedRequests assignedReqs = mock(AssignedRequests.class);
+      TaskAttemptId taskAttempt = mock(TaskAttemptId.class);
+      when(assignedReqs.get(any(ContainerId.class))).thenReturn(taskAttempt);
+      return assignedReqs;
+    }
+  }
 
   @Test
   public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to