Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 3deeb9a15 -> 9d3c51eb5
YARN-4311. Removing nodes from include and exclude lists will not remove them
from decommissioned nodes list. Contributed by Kuhu Shukla
(cherry picked from commit 1cbcd4a491e6a57d466c2897335614dc6770b475)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d3c51eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d3c51eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d3c51eb
Branch: refs/heads/branch-2.8
Commit: 9d3c51eb5a701ae4ba296e2fd3fba154fa0097c0
Parents: 3deeb9a
Author: Jason Lowe <[email protected]>
Authored: Tue Apr 5 13:47:05 2016 +0000
Committer: Jason Lowe <[email protected]>
Committed: Tue Apr 5 13:47:05 2016 +0000
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 9 +
.../yarn/sls/scheduler/RMNodeWrapper.java | 9 +
.../hadoop/yarn/conf/YarnConfiguration.java | 9 +
.../src/main/resources/yarn-default.xml | 13 ++
.../resourcemanager/NodesListManager.java | 104 ++++++++-
.../server/resourcemanager/RMServerUtils.java | 2 +-
.../resourcemanager/ResourceTrackerService.java | 8 +-
.../server/resourcemanager/rmnode/RMNode.java | 4 +
.../resourcemanager/rmnode/RMNodeImpl.java | 22 +-
.../yarn/server/resourcemanager/MockNodes.java | 9 +
.../TestResourceTrackerService.java | 216 +++++++++++++++++--
.../webapp/TestRMWebServicesNodes.java | 12 +-
12 files changed, 387 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 92d586b..951f5a8 100644
---
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -199,6 +199,15 @@ public class NodeInfo {
public ResourceUtilization getNodeUtilization() {
return null;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 2e9cccb..e5013c4 100644
---
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -188,4 +188,13 @@ public class RMNodeWrapper implements RMNode {
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8018d1c..f907104 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -639,6 +639,15 @@ public class YarnConfiguration extends Configuration {
"NONE";
/**
+ * Timeout(msec) for an untracked node to remain in shutdown or
decommissioned
+ * state.
+ */
+ public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
+ RM_PREFIX + "node-removal-untracked.timeout-ms";
+ public static final int
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
+
+ /**
* RM proxy users' prefix
*/
public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a588893..d053a9a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2603,4 +2603,17 @@
<name>yarn.node-labels.fs-store.impl.class</name>
<value>org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore</value>
</property>
+
+ <property>
+ <description>
+ The least amount of time(msec.) an inactive (decommissioned or shutdown)
node can
+ stay in the nodes list of the resourcemanager after being declared
untracked.
+ A node is marked untracked if and only if it is absent from both include
and
+ exclude nodemanager lists on the RM. All inactive nodes are checked twice
per
+ timeout interval or every 10 minutes, whichever is lesser, and marked
appropriately.
+ The same is done when refreshNodes command (graceful or otherwise) is
invoked.
+ </description>
+ <name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
+ <value>60000</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 89027b1..9585a6e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService
implements
private String excludesFile;
private Resolver resolver;
+ private Timer removalTimer;
+ private int nodeRemovalCheckInterval;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
@@ -104,9 +107,56 @@ public class NodesListManager extends CompositeService
implements
} catch (IOException ioe) {
disableHostsFileReader(ioe);
}
+
+ final int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
+ 600000));
+ removalTimer = new Timer("Node Removal Timer");
+
+ removalTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ long now = Time.monotonicNow();
+ for (Map.Entry<NodeId, RMNode> entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(rmNode.getHostName())) {
+ if (rmNode.getUntrackedTimeStamp() == 0) {
+ rmNode.setUntrackedTimeStamp(now);
+ } else if (now - rmNode.getUntrackedTimeStamp() >
+ nodeRemovalTimeout) {
+ RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
+ if (result != null) {
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+ if (rmNode.getState() == NodeState.SHUTDOWN) {
+ clusterMetrics.decrNumShutdownNMs();
+ } else {
+ clusterMetrics.decrDecommisionedNMs();
+ }
+ LOG.info("Removed "+result.getHostName() +
+ " from inactive nodes list");
+ }
+ }
+ } else {
+ rmNode.setUntrackedTimeStamp(0);
+ }
+ }
+ }
+ }, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
+
super.serviceInit(conf);
}
+ @Override
+ public void serviceStop() {
+ removalTimer.cancel();
+ }
+
private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) {
return;
@@ -130,10 +180,13 @@ public class NodesListManager extends CompositeService
implements
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
}
}
+ updateInactiveNodes();
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
@@ -171,6 +224,16 @@ public class NodesListManager extends CompositeService
implements
}
@VisibleForTesting
+ public int getNodeRemovalCheckInterval() {
+ return nodeRemovalCheckInterval;
+ }
+
+ @VisibleForTesting
+ public void setNodeRemovalCheckInterval(int interval) {
+ this.nodeRemovalCheckInterval = interval;
+ }
+
+ @VisibleForTesting
public Resolver getResolver() {
return resolver;
}
@@ -373,6 +436,33 @@ public class NodesListManager extends CompositeService
implements
return hostsReader;
}
+ private void updateInactiveNodes() {
+ long now = Time.monotonicNow();
+ for(Entry<NodeId, RMNode> entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(nodeId.getHost()) &&
+ rmNode.getUntrackedTimeStamp() == 0) {
+ rmNode.setUntrackedTimeStamp(now);
+ }
+ }
+ }
+
+ public boolean isUntrackedNode(String hostName) {
+ boolean untracked;
+ String ip = resolver.resolve(hostName);
+
+ synchronized (hostsReader) {
+ Set<String> hostsList = hostsReader.getHosts();
+ Set<String> excludeList = hostsReader.getExcludedHosts();
+ untracked = !hostsList.isEmpty() &&
+ !hostsList.contains(hostName) && !hostsList.contains(ip) &&
+ !excludeList.contains(hostName) && !excludeList.contains(ip);
+ }
+ return untracked;
+ }
+
/**
* Refresh the nodes gracefully
*
@@ -383,11 +473,13 @@ public class NodesListManager extends CompositeService
implements
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
- for (Entry<NodeId, RMNode> entry:rmContext.getRMNodes().entrySet()) {
+ for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@@ -396,6 +488,7 @@ public class NodesListManager extends CompositeService
implements
}
}
}
+ updateInactiveNodes();
}
/**
@@ -419,8 +512,11 @@ public class NodesListManager extends CompositeService
implements
public void refreshNodesForcefully() {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
+ RMNodeEventType nodeEventType =
+ isUntrackedNode(entry.getKey().getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(entry.getKey(), nodeEventType));
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e19d55e..1318d58 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -87,7 +87,7 @@ public class RMServerUtils {
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
- if (acceptedStates.contains(rmNode.getState())) {
+ if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index f52b5ab..6cb08bc 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -315,7 +315,8 @@ public class ResourceTrackerService extends AbstractService
implements
}
// Check if this node is a 'valid' node
- if (!this.nodesListManager.isValidNode(host)) {
+ if (!this.nodesListManager.isValidNode(host) ||
+ this.nodesListManager.isUntrackedNode(host)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";
@@ -446,8 +447,9 @@ public class ResourceTrackerService extends AbstractService
implements
// 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
// in decommissioning.
- if (!this.nodesListManager.isValidNode(nodeId.getHost())
- && !isNodeInDecommissioning(nodeId)) {
+ if ((!this.nodesListManager.isValidNode(nodeId.getHost()) &&
+ !isNodeInDecommissioning(nodeId)) ||
+ this.nodesListManager.isUntrackedNode(nodeId.getHost())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d8df9f1..e599576 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -168,4 +168,8 @@ public interface RMNode {
NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers();
+
+ long getUntrackedTimeStamp();
+
+ void setUntrackedTimeStamp(long timer);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 607e46c..d2aa6b9 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -39,6 +39,7 @@ import
org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -120,6 +121,7 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
private long lastHealthReportTime;
private String nodeManagerVersion;
+ private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
@@ -259,6 +261,9 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
+ RMNodeEventType.SHUTDOWN,
+ new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@@ -346,6 +351,7 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
+ this.timeStamp = 0;
this.latestNodeHeartBeatResponse.setResponseId(0);
@@ -997,7 +1003,7 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
}
/**
- * Put a node in deactivated (decommissioned) status.
+ * Put a node in deactivated (decommissioned or shutdown) status.
* @param rmNode
* @param finalState
*/
@@ -1014,6 +1020,10 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+ if (finalState == NodeState.SHUTDOWN &&
+ rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName))
{
+ rmNode.setUntrackedTimeStamp(Time.monotonicNow());
+ }
}
/**
@@ -1385,4 +1395,14 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
public Resource getOriginalTotalCapability() {
return this.originalTotalCapability;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return this.timeStamp;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long ts) {
+ this.timeStamp = ts;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 89aff29..921b18e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -260,6 +260,15 @@ public class MockNodes {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 9ed79a3..dd37b67 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -31,6 +31,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
@@ -48,8 +50,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
@@ -141,12 +141,12 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++metricCount);
+ checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
- .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@@ -155,7 +155,8 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
- .getNumDecommisionedNMs());
+ .getNumShutdownNMs());
+ rm.stop();
}
/**
@@ -228,7 +229,7 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
- int initialMetricCount = metrics.getNumDecommisionedNMs();
+ int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
@@ -241,16 +242,16 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++initialMetricCount);
+ checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
- "Node should not have been decomissioned.",
+ "Node should not have been shutdown.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
- nodeHeartbeat = nm2.nodeHeartbeat(true);
- Assert.assertEquals("Node should have been decomissioned but is in state" +
- nodeHeartbeat.getNodeAction(),
- NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
+ NodeState nodeState =
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
+ Assert.assertEquals("Node should have been shutdown but is in state" +
+ nodeState, NodeState.SHUTDOWN, nodeState);
}
/**
@@ -1123,8 +1124,6 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
- int shutdownNMsCount = ClusterMetrics.getMetrics()
- .getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
@@ -1149,10 +1148,12 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN,
heartbeatResponse.getNodeAction());
+ int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
+ shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
@@ -1168,8 +1169,9 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
- checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkShutdownNMCount(rm, ++shutdownNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
+ rm.stop();
}
@Test(timeout = 30000)
@@ -1304,6 +1306,186 @@ public class TestResourceTrackerService extends
NodeLabelTestBase {
rm.stop();
}
+ /**
+ * Remove a node from all lists and check if its forgotten
+ */
+ @Test
+ public void testNodeRemovalNormally() throws Exception {
+ testNodeRemovalUtil(false);
+ }
+
+ @Test
+ public void testNodeRemovalGracefully() throws Exception {
+ testNodeRemovalUtil(true);
+ }
+
+ public void refreshNodesOption(boolean doGraceful, Configuration conf)
+ throws Exception {
+ if (doGraceful) {
+ rm.getNodesListManager().refreshNodesGracefully(conf);
+ } else {
+ rm.getNodesListManager().refreshNodes(conf);
+ }
+ }
+
+ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
+ Configuration conf = new Configuration();
+ int timeoutValue = 500;
+ File excludeHostFile = new File(TEMP_DIR + File.separator +
+ "excludeHostFile.txt");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
+
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ timeoutValue);
+ CountDownLatch latch = new CountDownLatch(1);
+ rm = new MockRM(conf);
+ rm.init(conf);
+ rm.start();
+ RMContext rmContext = rm.getRMContext();
+ refreshNodesOption(doGraceful, conf);
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
+ assert (metrics != null);
+
+ //check all 3 nodes joined in as NORMAL
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm3.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ rm.drainEvents();
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Remove nm2 from include list, should now be shutdown with timer test
+ String ip = NetUtils.normalizeHostName("localhost");
+ writeToHostsFile("host1", ip);
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ rm.drainEvents();
+ Assert.assertTrue("Node should not be in active node list",
+ !rmContext.getRMNodes().containsKey(nm2.getNodeId()));
+
+ RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be in inactive node list",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ int nodeRemovalInterval =
+ rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+ long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+ rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+
+ //Check node removal and re-addition before timer expires
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ rm.drainEvents();
+ writeToHostsFile("host1", ip);
+ refreshNodesOption(doGraceful, conf);
+ rm.drainEvents();
+ rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be shutdown",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ //add back the node before timer expires
+ latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ rm.drainEvents();
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Decommission this node, check timer doesn't remove it
+ writeToHostsFile("host1", "host2", ip);
+ writeToHostsFile(excludeHostFile, "host2");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ rm.drainEvents();
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+
+ //Test decommed/ing node that transitions to untracked,timer should remove
+ writeToHostsFile("host1", ip, "host2");
+ writeToHostsFile(excludeHostFile, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ //nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertNotEquals("Timer for this node was not canceled!",
+ rmNode, null);
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+
+ writeToHostsFile("host1", ip);
+ writeToHostsFile(excludeHostFile, "");
+ refreshNodesOption(doGraceful, conf);
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumDecommisionedNMs(), 0);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+
+ rm.stop();
+ }
+
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c51eb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 3fd1fd5..4b6ca12 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -272,8 +272,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase
{
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
- .toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
}
@@ -304,8 +306,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase
{
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state",
- rmNode.getState().toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
@Test