Repository: hadoop
Updated Branches:
  refs/heads/branch-2 d462e4833 -> 9b1abb448


YARN-6714. IllegalStateException while handling APP_ATTEMPT_REMOVED event when 
async-scheduling enabled in CapacityScheduler. Contributed by Tao Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9b1abb44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9b1abb44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9b1abb44

Branch: refs/heads/branch-2
Commit: 9b1abb448d3274c825c278397063ad6393cfa440
Parents: d462e48
Author: Sunil G <sun...@apache.org>
Authored: Mon Jul 17 13:22:04 2017 +0530
Committer: Sunil G <sun...@apache.org>
Committed: Mon Jul 17 13:22:04 2017 +0530

----------------------------------------------------------------------
 .../scheduler/capacity/CapacityScheduler.java   |   5 +-
 .../TestCapacitySchedulerAsyncScheduling.java   | 154 +++++++++++++++++++
 2 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b1abb44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e586aa3..3179d07 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2414,7 +2414,10 @@ public class CapacityScheduler extends
 
     if (attemptId != null) {
       FiCaSchedulerApp app = getApplicationAttempt(attemptId);
-      if (app != null) {
+      // Required sanity check for attemptId - when async-scheduling enabled,
+      // proposal might be outdated if AM failover just finished
+      // and proposal queue was not be consumed in time
+      if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
         if (app.accept(cluster, request)) {
           app.apply(cluster, request);
           LOG.info("Allocation proposal accepted");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b1abb44/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index b2cf805..fdd320a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -18,7 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -26,12 +31,30 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class TestCapacitySchedulerAsyncScheduling {
@@ -141,4 +164,135 @@ public class TestCapacitySchedulerAsyncScheduling {
 
     rm.close();
   }
+
+  // Testcase for YARN-6714
+  @Test (timeout = 30000)
+  public void testCommitProposalForFailedAppAttempt()
+      throws Exception {
+    // disable async-scheduling for simulating complex since scene
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs & Nodes
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+    List<MockNM> nmLst = new ArrayList<>();
+    nmLst.add(nm1);
+    nmLst.add(nm2);
+
+    // init scheduler & nodes
+    while (
+        ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
+            .nodeCount() < 2) {
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(2,
+        ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+            .getNodeTracker().nodeCount());
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
+    SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId());
+
+    // launch app
+    RMApp app = rm.submitApp(200, "app", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+    FiCaSchedulerApp schedulerApp =
+        scheduler.getApplicationAttempt(am.getApplicationAttemptId());
+
+    // allocate and launch 1 containers and running on nm2
+    allocateAndLaunchContainers(am, nm2, rm, 1,
+        Resources.createResource(5 * GB), 0, 2);
+
+    // nm1 runs 1 container(app1-container_01/AM)
+    // nm2 runs 1 container(app1-container_02)
+    Assert.assertEquals(1, sn1.getNumContainers());
+    Assert.assertEquals(1, sn2.getNumContainers());
+
+    // kill app attempt1
+    scheduler.handle(
+        new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(),
+            RMAppAttemptState.KILLED, true));
+    // wait until app attempt1 removed on nm1
+    while (sn1.getCopiedListOfRunningContainers().size() == 1) {
+      Thread.sleep(100);
+    }
+    // wait until app attempt2 launched on nm1
+    while (sn1.getCopiedListOfRunningContainers().size() == 0) {
+      nm1.nodeHeartbeat(true);
+      Thread.sleep(100);
+    }
+
+    // generate reserved proposal of stopped app attempt
+    // and it could be committed for async-scheduling
+    // this kind of proposal should be skipped
+    Resource reservedResource = Resources.createResource(5 * GB);
+    Container container = Container.newInstance(
+        ContainerId.newContainerId(am.getApplicationAttemptId(), 3),
+        sn2.getNodeID(), sn2.getHttpAddress(), reservedResource,
+        Priority.newInstance(0), null);
+    RMContainer rmContainer = new RMContainerImpl(container, 
SchedulerRequestKey
+        .create(ResourceRequest
+            .newInstance(Priority.newInstance(0), "*", reservedResource, 1)),
+        am.getApplicationAttemptId(), sn2.getNodeID(), "user",
+        rm.getRMContext());
+    SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> reservedContainer =
+        new SchedulerContainer<>(
+            schedulerApp, scheduler.getNode(sn2.getNodeID()), rmContainer, "",
+            false);
+    ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
+        reservedForAttempt1Proposal =
+        new ContainerAllocationProposal<>(
+            reservedContainer, null, reservedContainer, NodeType.OFF_SWITCH,
+            NodeType.OFF_SWITCH, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+            reservedResource);
+    List<ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>>
+        reservedProposals = new ArrayList<>();
+    reservedProposals.add(reservedForAttempt1Proposal);
+    ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
+        new ResourceCommitRequest<>(null,
+            reservedProposals, null);
+    scheduler.tryCommit(scheduler.getClusterResource(), request);
+    Assert.assertNull("Outdated proposal should not be accepted!",
+        sn2.getReservedContainer());
+
+    rm.stop();
+  }
+
+
+  private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
+      int nContainer, Resource resource, int priority, int startContainerId)
+      throws Exception {
+    am.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(priority), "*", resource,
+            nContainer)), null);
+    ContainerId lastContainerId = ContainerId
+        .newContainerId(am.getApplicationAttemptId(),
+            startContainerId + nContainer - 1);
+    Assert.assertTrue(
+        rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED));
+    // Acquire them, and NM report RUNNING
+    am.allocate(null, null);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    for (int cId = startContainerId;
+         cId < startContainerId + nContainer; cId++) {
+      ContainerId containerId =
+          ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
+      RMContainer rmContainer = cs.getRMContainer(containerId);
+      if (rmContainer != null) {
+        rmContainer.handle(
+            new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+      } else {
+        Assert.fail("Cannot find RMContainer");
+      }
+      rm.waitForState(nm,
+          ContainerId.newContainerId(am.getApplicationAttemptId(), cId),
+          RMContainerState.RUNNING);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to