Repository: hadoop
Updated Branches:
  refs/heads/branch-3.1 991514f7c -> 734bc4228


YARN-8575. Avoid committing allocation proposal to unavailable nodes in async 
scheduling. Contributed by Tao Yang.

(cherry picked from commit 0a71bf145293adbd3728525ab4c36c08d51377d3)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/734bc422
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/734bc422
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/734bc422

Branch: refs/heads/branch-3.1
Commit: 734bc4228974512ef8ff9fc6e339ac281d0fb1d4
Parents: 991514f
Author: Weiwei Yang <w...@apache.org>
Authored: Fri Aug 10 14:37:45 2018 +0800
Committer: Weiwei Yang <w...@apache.org>
Committed: Fri Aug 10 15:10:27 2018 +0800

----------------------------------------------------------------------
 .../scheduler/common/fica/FiCaSchedulerApp.java | 12 ++++
 .../yarn/server/resourcemanager/MockNodes.java  |  6 +-
 .../resourcemanager/TestResourceManager.java    | 16 ++++-
 .../TestCapacitySchedulerAsyncScheduling.java   | 69 ++++++++++++++++++++
 .../scheduler/capacity/TestUtils.java           |  2 +
 5 files changed, 100 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/734bc422/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9810e98..6a5af81 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -429,6 +430,17 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
         SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
             schedulerContainer = allocation.getAllocatedOrReservedContainer();
 
+        // Make sure node is in RUNNING state
+        if (schedulerContainer.getSchedulerNode().getRMNode().getState()
+            != NodeState.RUNNING) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to accept this proposal because node "
+                + schedulerContainer.getSchedulerNode().getNodeID() + " is in "
+                + schedulerContainer.getSchedulerNode().getRMNode().getState()
+                + " state (not RUNNING)");
+          }
+          return false;
+        }
         if (schedulerContainer.isAllocated()) {
           // When allocate a new container
           containerRequest =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734bc422/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 9041132..c444b6e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -347,17 +347,17 @@ public class MockNodes {
   }
 
   public static RMNode newNodeInfo(int rack, final Resource perNode, int 
hostnum) {
-    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123);
+    return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", 
hostnum, null, 123);
   }
   
   public static RMNode newNodeInfo(int rack, final Resource perNode,
       int hostnum, String hostName) {
-    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 
123);
+    return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", 
hostnum, hostName, 123);
   }
 
   public static RMNode newNodeInfo(int rack, final Resource perNode,
       int hostnum, String hostName, int port) {
-    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 
port);
+    return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", 
hostnum, hostName, port);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734bc422/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
index 941e477..a66c583 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -86,8 +89,9 @@ public class TestResourceManager {
   }
 
   @Test
-  public void testResourceAllocation() throws IOException,
-      YarnException, InterruptedException {
+  public void testResourceAllocation()
+      throws IOException, YarnException, InterruptedException,
+      TimeoutException {
     LOG.info("--- START: testResourceAllocation ---");
         
     final int memory = 4 * 1024;
@@ -105,6 +109,14 @@ public class TestResourceManager {
       registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
           Resources.createResource(memory/2, vcores/2));
 
+    // nodes should be in RUNNING state
+    RMNodeImpl node1 = (RMNodeImpl) 
resourceManager.getRMContext().getRMNodes().get(
+        nm1.getNodeId());
+    RMNodeImpl node2 = (RMNodeImpl) 
resourceManager.getRMContext().getRMNodes().get(
+        nm2.getNodeId());
+    node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null));
+    node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null));
+
     // Submit an application
     Application application = new Application("user1", resourceManager);
     application.submit();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734bc422/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index c2c1519..840d30d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -43,6 +45,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -745,6 +749,71 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm1.close();
   }
 
+  @Test(timeout = 30000)
+  public void testCommitProposalsForUnusableNode() throws Exception {
+    // disable async-scheduling for simulating complex scene
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
+    final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
+    final MockNM nm3 = rm.registerNode("192.168.0.3:2234", 8 * GB);
+    rm.drainEvents();
+    CapacityScheduler cs =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    // launch app1-am on nm1
+    RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+    // launch app2-am on nm2
+    RMApp app2 = rm.submitApp(1 * GB, "app2", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+    // app2 asks 1 * 8G container
+    am2.allocate(ImmutableList.of(ResourceRequest
+        .newInstance(Priority.newInstance(0), "*",
+            Resources.createResource(8 * GB), 1)), null);
+
+    List<Object> reservedProposalParts = new ArrayList<>();
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    // handle CapacityScheduler#tryCommit
+    Mockito.doAnswer(new Answer<Object>() {
+      public Boolean answer(InvocationOnMock invocation) throws Exception {
+        for (Object argument : invocation.getArguments()) {
+          reservedProposalParts.add(argument);
+        }
+        return false;
+      }
+    }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+        Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
+
+    spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+
+    // decommission nm1
+    RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
+    cs.getRMContext().getDispatcher().getEventHandler().handle(
+        new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
+    rm.drainEvents();
+    Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()));
+
+    // try commit after nm1 decommissioned
+    boolean isSuccess =
+        cs.tryCommit((Resource) reservedProposalParts.get(0),
+            (ResourceCommitRequest) reservedProposalParts.get(1),
+            (Boolean) reservedProposalParts.get(2));
+    Assert.assertFalse(isSuccess);
+    rm.stop();
+  }
+
   private ResourceCommitRequest createAllocateFromReservedProposal(
       int containerId, Resource allocateResource, FiCaSchedulerApp 
schedulerApp,
       SchedulerNode allocateNode, SchedulerNode reservedNode,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/734bc422/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index fae63be..b13790d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
@@ -220,6 +221,7 @@ public class TestUtils {
     when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
     when(rmNode.getHostName()).thenReturn(host);
     when(rmNode.getRackName()).thenReturn(rack);
+    when(rmNode.getState()).thenReturn(NodeState.RUNNING);
     
     FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false));
     LOG.info("node = " + host + " avail=" + node.getUnallocatedResource());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to