Repository: hadoop
Updated Branches:
  refs/heads/HDFS-9806 c37346d0e -> 1eae719bc


MAPREDUCE-6541. Exclude scheduled reducer memory when calculating available 
mapper slots from headroom to avoid deadlock. Contributed by Varun Saxena


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/060558c6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/060558c6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/060558c6

Branch: refs/heads/HDFS-9806
Commit: 060558c6f221ded0b014189d5b82eee4cc7b576b
Parents: 8bd6d77
Author: Naganarasimha <[email protected]>
Authored: Thu Oct 27 18:03:13 2016 +0530
Committer: Naganarasimha <[email protected]>
Committed: Thu Oct 27 18:03:13 2016 +0530

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         |  24 ++--
 .../v2/app/rm/TestRMContainerAllocator.java     | 122 +++++++++++++++++++
 2 files changed, 137 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/060558c6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 4cb3cbe..e3b673a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -338,6 +338,12 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     return scheduledRequests;
   }
 
+  @Private
+  @VisibleForTesting
+  int getNumOfPendingReduces() {
+    return pendingReduces.size();
+  }
+
   public boolean getIsReduceStarted() {
     return reduceStarted;
   }
@@ -521,15 +527,20 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     }
 
     // The pending mappers haven't been waiting for too long. Let us see if
-    // the headroom can fit a mapper.
-    Resource availableResourceForMap = getAvailableResources();
+    // there are enough resources for a mapper to run. This is calculated by
+    // excluding scheduled reducers from headroom and comparing it against
+    // resources required to run one mapper.
+    Resource scheduledReducesResource = Resources.multiply(
+         reduceResourceRequest, scheduledRequests.reduces.size());
+    Resource availableResourceForMap =
+         Resources.subtract(getAvailableResources(), scheduledReducesResource);
     if 
(ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
         mapResourceRequest, getSchedulerResourceTypes()) > 0) {
-      // the available headroom is enough to run a mapper
+       // Enough room to run a mapper
       return false;
     }
 
-    // Available headroom is not enough to run mapper. See if we should hold
+    // Available resources are not enough to run mapper. See if we should hold
     // off before preempting reducers and preempt if okay.
     return 
preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
   }
@@ -990,11 +1001,6 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       Resources.add(assignedMapResource, assignedReduceResource));
   }
 
-  @VisibleForTesting
-  public int getNumOfPendingReduces() {
-    return pendingReduces.size();
-  }
-
   @Private
   @VisibleForTesting
   class ScheduledRequests {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/060558c6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 38a9731..bcce793 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -3190,6 +3190,128 @@ public class TestRMContainerAllocator {
     }
   }
 
+  /**
+   * Tests whether scheduled reducers are excluded from headroom while
+   * calculating headroom.
+   */
+  @Test
+  public void testExcludeSchedReducesFromHeadroom() throws Exception {
+    LOG.info("Running testExcludeSchedReducesFromHeadroom");
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, 
-1);
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    Task mockTask = mock(Task.class);
+    TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
+    when(mockJob.getTask((TaskId)any())).thenReturn(mockTask);
+    
when(mockTask.getAttempt((TaskAttemptId)any())).thenReturn(mockTaskAttempt);
+    when(mockTaskAttempt.getProgress()).thenReturn(0.01f);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    MockNM nodeManager = rm.registerNode("h1:1234", 4096);
+    dispatcher.await();
+    // Register nodes to RM.
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 1024);
+    dispatcher.await();
+
+    // Request 2 maps and 1 reducer(sone on nodes which are not registered).
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 1024, new String[] { "h2" });
+    allocator.sendRequest(event2);
+    ContainerRequestEvent event3 =
+         createReq(jobId, 3, 1024, new String[] { "h1" }, false, true);
+    allocator.sendRequest(event3);
+
+    // This will tell the scheduler about the requests but there will be no
+    // allocations as nodes are not added.
+    allocator.schedule();
+    dispatcher.await();
+
+    // Request for another reducer on h3 which has not registered.
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
+    allocator.sendRequest(event4);
+
+    allocator.schedule();
+    dispatcher.await();
+
+    // Update resources in scheduler through node heartbeat from h1.
+    nodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3));
+    allocator.schedule();
+    dispatcher.await();
+
+    // Two maps are assigned.
+    Assert.assertEquals(2, allocator.getAssignedRequests().maps.size());
+    // Send deallocate request for map so that no maps are assigned after this.
+    ContainerAllocatorEvent deallocate1 = createDeallocateEvent(jobId, 1, 
false);
+    allocator.sendDeallocate(deallocate1);
+    ContainerAllocatorEvent deallocate2 = createDeallocateEvent(jobId, 2, 
false);
+    allocator.sendDeallocate(deallocate2);
+    // No map should be assigned.
+    Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
+
+    nodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
+    allocator.schedule();
+    dispatcher.await();
+
+    // h2 heartbeats.
+    nodeManager2.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Send request for one more mapper.
+    ContainerRequestEvent event5 =
+        createReq(jobId, 5, 1024, new String[] { "h1" });
+    allocator.sendRequest(event5);
+
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
+    allocator.schedule();
+    dispatcher.await();
+    // One reducer is assigned and one map is scheduled
+    Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
+    Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size());
+    // Headroom enough to run a mapper if headroom is taken as it is but wont 
be
+    // enough if scheduled reducers resources are deducted.
+    rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2));
+    allocator.schedule();
+    dispatcher.await();
+    // After allocate response, the one assigned reducer is preempted and 
killed
+    Assert.assertEquals(1, 
MyContainerAllocator.getTaskAttemptKillEvents().size());
+    Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC,
+        MyContainerAllocator.getTaskAttemptKillEvents().get(0).getMessage());
+    Assert.assertEquals(1, allocator.getNumOfPendingReduces());
+  }
+
   private static class MockScheduler implements ApplicationMasterProtocol {
     ApplicationAttemptId attemptId;
     long nextContainerId = 10;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to