Repository: hadoop
Updated Branches:
  refs/heads/trunk b0aff8a96 -> 7eb783e26


YARN-8127. Resource leak when async scheduling is enabled. Contributed by Tao 
Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7eb783e2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7eb783e2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7eb783e2

Branch: refs/heads/trunk
Commit: 7eb783e2634d8c11fb646f1f2fdf597336325312
Parents: b0aff8a
Author: Weiwei Yang <w...@apache.org>
Authored: Wed Apr 11 17:15:25 2018 +0800
Committer: Weiwei Yang <w...@apache.org>
Committed: Wed Apr 11 17:15:25 2018 +0800

----------------------------------------------------------------------
 .../scheduler/common/fica/FiCaSchedulerApp.java | 10 +++
 .../TestCapacitySchedulerAsyncScheduling.java   | 91 ++++++++++++++++++++
 2 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7eb783e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 32b2cad..3ec8191 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -339,6 +339,16 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
         return false;
       }
     }
+    // If allocate from reserved container, make sure node is still reserved
+    if (allocation.getAllocateFromReservedContainer() != null
+        && reservedContainerOnNode == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Try to allocate from reserved container " + allocation
+            .getAllocateFromReservedContainer().getRmContainer()
+            .getContainerId() + ", but node is not reserved");
+      }
+      return false;
+    }
 
     // Do we have enough space on this node?
     Resource availableResource = Resources.clone(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7eb783e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 18cd942..338b9f9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -594,6 +594,97 @@ public class TestCapacitySchedulerAsyncScheduling {
     }
   }
 
+  // Testcase for YARN-8127
+  @Test (timeout = 30000)
+  public void testCommitDuplicatedAllocateFromReservedProposals()
+      throws Exception {
+    // disable async-scheduling for simulating complex scene
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
+    rm.registerNode("192.168.0.2:2234", 8 * GB);
+
+    // init scheduler & nodes
+    while (
+        ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
+            .nodeCount() < 2) {
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(2,
+        ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+            .getNodeTracker().nodeCount());
+    CapacityScheduler cs =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    // launch app
+    RMApp app = rm.submitApp(1 * GB, "app", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+    FiCaSchedulerApp schedulerApp =
+        cs.getApplicationAttempt(am.getApplicationAttemptId());
+
+    // app asks 1 * 6G container
+    // nm1 runs 2 container(container_01/AM, container_02)
+    allocateAndLaunchContainers(am, nm1, rm, 1,
+        Resources.createResource(6 * GB), 0, 2);
+    Assert.assertEquals(2, sn1.getNumContainers());
+    Assert.assertEquals(1 * GB, sn1.getUnallocatedResource().getMemorySize());
+
+    // app asks 5 * 2G container
+    // nm1 reserves 1 * 2G containers
+    am.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(0), "*",
+            Resources.createResource(2 * GB), 5)), null);
+    cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+    Assert.assertEquals(1, schedulerApp.getReservedContainers().size());
+
+    // rm kills 1 * 6G container_02
+    for (RMContainer rmContainer : sn1.getCopiedListOfRunningContainers()) {
+      if (rmContainer.getContainerId().getContainerId() != 1) {
+        cs.completedContainer(rmContainer, ContainerStatus
+                .newInstance(rmContainer.getContainerId(),
+                    ContainerState.COMPLETE, "",
+                    ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+            RMContainerEventType.KILL);
+      }
+    }
+    Assert.assertEquals(7 * GB, sn1.getUnallocatedResource().getMemorySize());
+
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    // handle CapacityScheduler#tryCommit, submit duplicated proposals
+    // that do allocation for reserved container for three times,
+    // to simulate that case in YARN-8127
+    Mockito.doAnswer(new Answer<Object>() {
+      public Boolean answer(InvocationOnMock invocation) throws Exception {
+        ResourceCommitRequest request =
+            (ResourceCommitRequest) invocation.getArguments()[1];
+        if (request.getFirstAllocatedOrReservedContainer()
+            .getAllocateFromReservedContainer() != null) {
+          for (int i=0; i<3; i++) {
+            cs.tryCommit((Resource) invocation.getArguments()[0],
+                (ResourceCommitRequest) invocation.getArguments()[1],
+                (Boolean) invocation.getArguments()[2]);
+          }
+          Assert.assertEquals(2, 
sn1.getCopiedListOfRunningContainers().size());
+          Assert.assertEquals(5 * GB,
+              sn1.getUnallocatedResource().getMemorySize());
+        }
+        return true;
+      }
+    }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+        Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
+
+    spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+
+    rm.stop();
+  }
+
   private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
     if (nmHeartbeatThread != null) {
       nmHeartbeatThread.setShouldStop();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to