Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 162ee0f0a -> f9016dfec


YARN-5566. Client-side NM graceful decom is not triggered when jobs finish. 
(Robert Kanter via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f9016dfe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f9016dfe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f9016dfe

Branch: refs/heads/branch-2.8
Commit: f9016dfec33f1d6486c03a54f0a479ed08aff34f
Parents: 162ee0f
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Tue Sep 6 16:23:06 2016 -0700
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Tue Sep 6 16:23:06 2016 -0700

----------------------------------------------------------------------
 .../resourcemanager/rmnode/RMNodeImpl.java      |  31 ++---
 .../resourcemanager/TestRMNodeTransitions.java  |  29 +++--
 .../TestResourceTrackerService.java             | 112 +++++++++++++++++++
 3 files changed, 143 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9016dfe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index b716247..830a6a9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -1133,12 +1133,21 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
       NodeState initialState = rmNode.getState();
       boolean isNodeDecommissioning =
           initialState.equals(NodeState.DECOMMISSIONING);
+      if (isNodeDecommissioning) {
+        List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
+        if (rmNode.runningApplications.isEmpty() &&
+            (keepAliveApps == null || keepAliveApps.isEmpty())) {
+          RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
+          return NodeState.DECOMMISSIONED;
+        }
+      }
+
       if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
         LOG.info("Node " + rmNode.nodeId +
             " reported UNHEALTHY with details: " +
             remoteNodeHealthStatus.getHealthReport());
         // if a node in decommissioning receives an unhealthy report,
-        // it will keep decommissioning.
+        // it will stay in decommissioning.
         if (isNodeDecommissioning) {
           return NodeState.DECOMMISSIONING;
         } else {
@@ -1146,24 +1155,6 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
           return NodeState.UNHEALTHY;
         }
       }
-      if (isNodeDecommissioning) {
-        List<ApplicationId> runningApps = rmNode.getRunningApps();
-
-        List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
-
-        // no running (and keeping alive) app on this node, get it
-        // decommissioned.
-        // TODO may need to check no container is being scheduled on this node
-        // as well.
-        if ((runningApps == null || runningApps.size() == 0)
-            && (keepAliveApps == null || keepAliveApps.size() == 0)) {
-          RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
-          return NodeState.DECOMMISSIONED;
-        }
-
-        // TODO (in YARN-3223) if node in decommissioning, get node resource
-        // updated if container get finished (keep available resource to be 0)
-      }
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
       rmNode.handleReportedIncreasedContainers(
@@ -1337,7 +1328,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
               + " is the first container get launched for application "
               + containerAppId);
         }
-        runningApplications.add(containerAppId);
+        handleRunningAppOnNode(this, context, containerAppId, nodeId);
       }
 
       // Process running containers

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9016dfe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 83a7c73..5a462ea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
     .AllocationExpirationInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -73,6 +74,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -290,16 +292,17 @@ public class TestRMNodeTransitions {
     NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
     RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, 
null, null);
     node2.handle(new RMNodeStartedEvent(null, null, null));
-    
+
+    ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
+    ApplicationId app1 = BuilderUtils.newApplicationId(1, 1);
     ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
-        BuilderUtils.newApplicationAttemptId(
-            BuilderUtils.newApplicationId(0, 0), 0), 0);
+        BuilderUtils.newApplicationAttemptId(app0, 0), 0);
     ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
-        BuilderUtils.newApplicationAttemptId(
-            BuilderUtils.newApplicationId(1, 1), 1), 1);
+        BuilderUtils.newApplicationAttemptId(app1, 1), 1);
     ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
-        BuilderUtils.newApplicationAttemptId(
-            BuilderUtils.newApplicationId(1, 1), 1), 2);
+        BuilderUtils.newApplicationAttemptId(app1, 1), 2);
+    rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class));
+    rmContext.getRMApps().put(app1, Mockito.mock(RMApp.class));
 
     RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
     RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
@@ -663,6 +666,7 @@ public class TestRMNodeTransitions {
     NodeId nodeId = node.getNodeID();
 
     ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
+    rmContext.getRMApps().put(runningAppId, Mockito.mock(RMApp.class));
     // Create a running container
     ContainerId runningContainerId = BuilderUtils.newContainerId(
         BuilderUtils.newApplicationAttemptId(
@@ -930,16 +934,22 @@ public class TestRMNodeTransitions {
   }
 
   // Test unhealthy report on a decommissioning node will make it
-  // keep decommissioning.
+  // keep decommissioning as long as there's a running or keep alive app.
+  // Otherwise, it will go to decommissioned
   @Test
   public void testDecommissioningUnhealthy() {
     RMNodeImpl node = getDecommissioningNode();
     NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
         System.currentTimeMillis());
+    List<ApplicationId> keepAliveApps = new ArrayList<>();
+    keepAliveApps.add(BuilderUtils.newApplicationId(1, 1));
     NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
-        new ArrayList<ContainerStatus>(), null, status, null, null, null);
+        null, keepAliveApps, status, null, null, null);
     node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
     Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+    nodeStatus.setKeepAliveApplications(null);
+    node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
+    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
   }
 
   @Test
@@ -962,6 +972,7 @@ public class TestRMNodeTransitions {
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 1);
+    rmContext.getRMApps().put(appId, Mockito.mock(RMApp.class));
     ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
     ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
     AllocationExpirationInfo expirationInfo1 =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9016dfe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 96efcfd..f3f8644 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -65,6 +65,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -216,6 +217,117 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
   }
 
   /**
+   * Graceful decommission node with no running application.
+   */
+  @Test
+  public void testGracefulDecommissionNoApp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
+        .getAbsolutePath());
+
+    writeToHostsFile("");
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    MockNM nm3 = rm.registerNode("host3:4433", 5120);
+
+    int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
+    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+
+    
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
+    
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
+    
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
+
+    rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
+    rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
+
+    // Graceful decommission both host2 and host3.
+    writeToHostsFile("host2", "host3");
+    rm.getNodesListManager().refreshNodesGracefully(conf);
+
+    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
+    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
+
+    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
+    nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED);
+    nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED);
+
+    checkDecommissionedNMCount(rm, metricCount + 2);
+
+    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
+    nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction());
+    nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
+  }
+
+  /**
+   * Graceful decommission node with running application.
+   */
+  @Test
+  public void testGracefulDecommissionWithApp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
+        .getAbsolutePath());
+
+    writeToHostsFile("");
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 10240);
+    MockNM nm2 = rm.registerNode("host2:5678", 20480);
+    MockNM nm3 = rm.registerNode("host3:4433", 10240);
+    NodeId id1 = nm1.getNodeId();
+    NodeId id3 = nm3.getNodeId();
+    rm.waitForState(id1, NodeState.RUNNING);
+    rm.waitForState(id3, NodeState.RUNNING);
+
+    // Create an app and launch two containers on host1.
+    RMApp app = rm.submitApp(2000);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+    ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
+    nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
+    nm3.nodeHeartbeat(true);
+
+    // Graceful decommission host1 and host3
+    writeToHostsFile("host1", "host3");
+    rm.getNodesListManager().refreshNodesGracefully(conf);
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+    rm.waitForState(id3, NodeState.DECOMMISSIONING);
+
+    // host1 should be DECOMMISSIONING due to running containers.
+    // host3 should become DECOMMISSIONED.
+    nm1.nodeHeartbeat(true);
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+    nm3.nodeHeartbeat(true);
+    rm.waitForState(id3, NodeState.DECOMMISSIONED);
+    nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
+
+    // Complete containers on host1.
+    // Since the app is still RUNNING, expect NodeAction.NORMAL.
+    NodeHeartbeatResponse nodeHeartbeat1 =
+        nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction());
+
+    // Finish the app and verified DECOMMISSIONED.
+    MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
+    rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+    nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction());
+    rm.waitForState(id1, NodeState.DECOMMISSIONED);
+    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction());
+  }
+
+  /**
   * Decommissioning using a post-configured include hosts file
   */
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to