YARN-4676. Automatic and Asynchronous Decommissioning Nodes Status Tracking.
Contributed by Diniel Zhi.
(cherry picked from commit d464483bf7f0b3e3be3ba32cd6c3eee546747ab5)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0da69c32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0da69c32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0da69c32
Branch: refs/heads/HADOOP-13345
Commit: 0da69c324dee9baab0f0b9700db1cc5b623f8421
Parents: 040c185
Author: Junping Du <[email protected]>
Authored: Thu Aug 18 07:23:29 2016 -0700
Committer: Junping Du <[email protected]>
Committed: Thu Aug 18 07:27:23 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/util/HostsFileReader.java | 111 ++++-
.../apache/hadoop/util/TestHostsFileReader.java | 64 ++-
hadoop-project/src/site/site.xml | 1 +
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 +
.../yarn/sls/scheduler/RMNodeWrapper.java | 5 +
.../hadoop/yarn/conf/YarnConfiguration.java | 14 +
.../protocolrecords/RefreshNodesRequest.java | 26 +-
..._server_resourcemanager_service_protos.proto | 1 +
.../hadoop/yarn/client/cli/RMAdminCLI.java | 166 ++++---
.../hadoop/yarn/client/cli/TestRMAdminCLI.java | 24 +-
.../impl/pb/RefreshNodesRequestPBImpl.java | 17 +-
.../src/main/resources/yarn-default.xml | 18 +
.../server/resourcemanager/AdminService.java | 3 +-
.../DecommissioningNodesWatcher.java | 439 +++++++++++++++++++
.../resourcemanager/NodesListManager.java | 166 +++++--
.../server/resourcemanager/RMServerUtils.java | 2 +-
.../resourcemanager/ResourceTrackerService.java | 19 +
.../server/resourcemanager/rmnode/RMNode.java | 6 +
.../rmnode/RMNodeDecommissioningEvent.java | 41 ++
.../resourcemanager/rmnode/RMNodeImpl.java | 54 ++-
.../webapp/dao/ClusterMetricsInfo.java | 2 +-
.../yarn/server/resourcemanager/MockNodes.java | 5 +
.../yarn/server/resourcemanager/MockRM.java | 11 +-
.../TestDecommissioningNodesWatcher.java | 131 ++++++
.../resourcemanager/TestRMNodeTransitions.java | 11 -
.../TestResourceTrackerService.java | 199 +++++++--
.../resourcetracker/TestNMReconnect.java | 2 -
.../src/site/markdown/YarnCommands.md | 2 +-
28 files changed, 1326 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
index 1cba426..2ef1ead 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
@@ -21,16 +21,27 @@ package org.apache.hadoop.util;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Set;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
-import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
// Keeps track of which datanodes/tasktrackers are allowed to connect to the
// namenode/jobtracker.
@@ -38,7 +49,9 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public class HostsFileReader {
private Set<String> includes;
- private Set<String> excludes;
+ // exclude host list with optional timeout.
+ // If the value is null, it indicates default timeout.
+ private Map<String, Integer> excludes;
private String includesFile;
private String excludesFile;
private WriteLock writeLock;
@@ -49,7 +62,7 @@ public class HostsFileReader {
public HostsFileReader(String inFile,
String exFile) throws IOException {
includes = new HashSet<String>();
- excludes = new HashSet<String>();
+ excludes = new HashMap<String, Integer>();
includesFile = inFile;
excludesFile = exFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -62,7 +75,7 @@ public class HostsFileReader {
public HostsFileReader(String includesFile, InputStream inFileInputStream,
String excludesFile, InputStream exFileInputStream) throws IOException {
includes = new HashSet<String>();
- excludes = new HashSet<String>();
+ excludes = new HashMap<String, Integer>();
this.includesFile = includesFile;
this.excludesFile = excludesFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -121,6 +134,73 @@ public class HostsFileReader {
}
}
+ public static void readFileToMap(String type,
+ String filename, Map<String, Integer> map) throws IOException {
+ File file = new File(filename);
+ FileInputStream fis = new FileInputStream(file);
+ readFileToMapWithFileInputStream(type, filename, fis, map);
+ }
+
+ public static void readFileToMapWithFileInputStream(String type,
+ String filename, InputStream inputStream, Map<String, Integer> map)
+ throws IOException {
+ // The input file could be either simple text or XML.
+ boolean xmlInput = filename.toLowerCase().endsWith(".xml");
+ if (xmlInput) {
+ readXmlFileToMapWithFileInputStream(type, filename, inputStream, map);
+ } else {
+ HashSet<String> nodes = new HashSet<String>();
+ readFileToSetWithFileInputStream(type, filename, inputStream, nodes);
+ for (String node : nodes) {
+ map.put(node, null);
+ }
+ }
+ }
+
+ public static void readXmlFileToMapWithFileInputStream(String type,
+ String filename, InputStream fileInputStream, Map<String, Integer> map)
+ throws IOException {
+ Document dom;
+ DocumentBuilderFactory builder = DocumentBuilderFactory.newInstance();
+ try {
+ DocumentBuilder db = builder.newDocumentBuilder();
+ dom = db.parse(fileInputStream);
+ // Examples:
+ // <host><name>host1</name></host>
+ // <host><name>host2</name><timeout>123</timeout></host>
+ // <host><name>host3</name><timeout>-1</timeout></host>
+ // <host><name>host4, host5,host6</name><timeout>1800</timeout></host>
+ Element doc = dom.getDocumentElement();
+ NodeList nodes = doc.getElementsByTagName("host");
+ for (int i = 0; i < nodes.getLength(); i++) {
+ Node node = nodes.item(i);
+ if (node.getNodeType() == Node.ELEMENT_NODE) {
+ Element e= (Element) node;
+ // Support both single host and comma-separated list of hosts.
+ String v = readFirstTagValue(e, "name");
+ String[] hosts = StringUtils.getTrimmedStrings(v);
+ String str = readFirstTagValue(e, "timeout");
+ Integer timeout = (str == null)? null : Integer.parseInt(str);
+ for (String host : hosts) {
+ map.put(host, timeout);
+ LOG.info("Adding a node \"" + host + "\" to the list of "
+ + type + " hosts from " + filename);
+ }
+ }
+ }
+ } catch (IOException|SAXException|ParserConfigurationException e) {
+ LOG.fatal("error parsing " + filename, e);
+ throw new RuntimeException(e);
+ } finally {
+ fileInputStream.close();
+ }
+ }
+
+ static String readFirstTagValue(Element e, String tag) {
+ NodeList nodes = e.getElementsByTagName(tag);
+ return (nodes.getLength() == 0)? null : nodes.item(0).getTextContent();
+ }
+
public void refresh(String includeFiles, String excludeFiles)
throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
@@ -129,7 +209,7 @@ public class HostsFileReader {
// update instance variables
updateFileNames(includeFiles, excludeFiles);
Set<String> newIncludes = new HashSet<String>();
- Set<String> newExcludes = new HashSet<String>();
+ Map<String, Integer> newExcludes = new HashMap<String, Integer>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (includeFiles != null && !includeFiles.isEmpty()) {
@@ -137,7 +217,7 @@ public class HostsFileReader {
switchIncludes = true;
}
if (excludeFiles != null && !excludeFiles.isEmpty()) {
- readFileToSet("excluded", excludeFiles, newExcludes);
+ readFileToMap("excluded", excludeFiles, newExcludes);
switchExcludes = true;
}
@@ -161,7 +241,7 @@ public class HostsFileReader {
this.writeLock.lock();
try {
Set<String> newIncludes = new HashSet<String>();
- Set<String> newExcludes = new HashSet<String>();
+ Map<String, Integer> newExcludes = new HashMap<String, Integer>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (inFileInputStream != null) {
@@ -170,7 +250,7 @@ public class HostsFileReader {
switchIncludes = true;
}
if (exFileInputStream != null) {
- readFileToSetWithFileInputStream("excluded", excludesFile,
+ readFileToMapWithFileInputStream("excluded", excludesFile,
exFileInputStream, newExcludes);
switchExcludes = true;
}
@@ -199,7 +279,7 @@ public class HostsFileReader {
public Set<String> getExcludedHosts() {
this.readLock.lock();
try {
- return excludes;
+ return excludes.keySet();
} finally {
this.readLock.unlock();
}
@@ -209,7 +289,18 @@ public class HostsFileReader {
this.readLock.lock();
try {
includes.addAll(this.includes);
- excludes.addAll(this.excludes);
+ excludes.addAll(this.excludes.keySet());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void getHostDetails(Set<String> includeHosts,
+ Map<String, Integer> excludeHosts) {
+ this.readLock.lock();
+ try {
+ includeHosts.addAll(this.includes);
+ excludeHosts.putAll(this.excludes);
} finally {
this.readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
index 8015f7a..5766591 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
@@ -20,16 +20,19 @@ package org.apache.hadoop.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
+import java.util.Map;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.*;
+
import static org.junit.Assert.*;
/*
* Test for HostsFileReader.java
- *
+ *
*/
public class TestHostsFileReader {
@@ -39,6 +42,7 @@ public class TestHostsFileReader {
File INCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.include");
String excludesFile = HOSTS_TEST_DIR + "/dfs.exclude";
String includesFile = HOSTS_TEST_DIR + "/dfs.include";
+ private String excludesXmlFile = HOSTS_TEST_DIR + "/dfs.exclude.xml";
@Before
public void setUp() throws Exception {
@@ -288,4 +292,62 @@ public class TestHostsFileReader {
assertFalse(hfp.getExcludedHosts().contains("somehost5"));
}
+
+ /*
+ * Test if timeout values are provided in HostFile
+ */
+ @Test
+ public void testHostFileReaderWithTimeout() throws Exception {
+ FileWriter efw = new FileWriter(excludesXmlFile);
+ FileWriter ifw = new FileWriter(includesFile);
+
+ efw.write("<?xml version=\"1.0\"?>\n");
+ efw.write("<!-- yarn.nodes.exclude -->\n");
+ efw.write("<hosts>\n");
+ efw.write("<host><name>host1</name></host>\n");
+ efw.write("<host><name>host2</name><timeout>123</timeout></host>\n");
+ efw.write("<host><name>host3</name><timeout>-1</timeout></host>\n");
+ efw.write("<host><name>10000</name></host>\n");
+ efw.write("<host><name>10001</name><timeout>123</timeout></host>\n");
+ efw.write("<host><name>10002</name><timeout>-1</timeout></host>\n");
+ efw.write("<host><name>host4,host5, host6</name>" +
+ "<timeout>1800</timeout></host>\n");
+ efw.write("</hosts>\n");
+ efw.close();
+
+ ifw.write("#Hosts-in-DFS\n");
+ ifw.write(" \n");
+ ifw.write(" somehost \t somehost2 \n somehost4");
+ ifw.write(" somehost3 \t # somehost5");
+ ifw.close();
+
+ HostsFileReader hfp = new HostsFileReader(includesFile, excludesXmlFile);
+
+ int includesLen = hfp.getHosts().size();
+ int excludesLen = hfp.getExcludedHosts().size();
+ assertEquals(4, includesLen);
+ assertEquals(9, excludesLen);
+
+ Set<String> includes = new HashSet<String>();
+ Map<String, Integer> excludes = new HashMap<String, Integer>();
+ hfp.getHostDetails(includes, excludes);
+ assertTrue(excludes.containsKey("host1"));
+ assertTrue(excludes.containsKey("host2"));
+ assertTrue(excludes.containsKey("host3"));
+ assertTrue(excludes.containsKey("10000"));
+ assertTrue(excludes.containsKey("10001"));
+ assertTrue(excludes.containsKey("10002"));
+ assertTrue(excludes.containsKey("host4"));
+ assertTrue(excludes.containsKey("host5"));
+ assertTrue(excludes.containsKey("host6"));
+ assertTrue(excludes.get("host1") == null);
+ assertTrue(excludes.get("host2") == 123);
+ assertTrue(excludes.get("host3") == -1);
+ assertTrue(excludes.get("10000") == null);
+ assertTrue(excludes.get("10001") == 123);
+ assertTrue(excludes.get("10002") == -1);
+ assertTrue(excludes.get("host4") == 1800);
+ assertTrue(excludes.get("host5") == 1800);
+ assertTrue(excludes.get("host6") == 1800);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-project/src/site/site.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml
index 9fa1469..0d87973 100644
--- a/hadoop-project/src/site/site.xml
+++ b/hadoop-project/src/site/site.xml
@@ -136,6 +136,7 @@
<item name="Secure Containers"
href="hadoop-yarn/hadoop-yarn-site/SecureContainer.html"/>
<item name="Registry"
href="hadoop-yarn/hadoop-yarn-site/registry/index.html"/>
<item name="Reservation System"
href="hadoop-yarn/hadoop-yarn-site/ReservationSystem.html"/>
+ <item name="Graceful Decommission"
href="hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html"/>
</menu>
<menu name="YARN REST APIs" inherit="top">
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index bd737bd..c598aa0 100644
---
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -213,6 +213,11 @@ public class NodeInfo {
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
+
+ @Override
+ public Integer getDecommissioningTimeout() {
+ return null;
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 5048978..6d0ffbd 100644
---
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -202,4 +202,9 @@ public class RMNodeWrapper implements RMNode {
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
+
+ @Override
+ public Integer getDecommissioningTimeout() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8899ccd..770d139 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -795,6 +795,20 @@ public class YarnConfiguration extends Configuration {
*/
public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
+ /**
+ * Timeout in seconds for YARN node graceful decommission.
+ * This is the maximal time to wait for running containers and applications
+ * to complete before transition a DECOMMISSIONING node into DECOMMISSIONED.
+ */
+ public static final String RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT =
+ RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs";
+ public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600;
+
+ public static final String RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL =
+ RM_PREFIX + "decommissioning-nodes-watcher.poll-interval-secs";
+ public static final int
+ DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20;
+
////////////////////////////////
// Node Manager Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
index 0333c3b..732d98e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
@@ -43,6 +43,16 @@ public abstract class RefreshNodesRequest {
return request;
}
+ @Private
+ @Unstable
+ public static RefreshNodesRequest newInstance(
+ DecommissionType decommissionType, Integer timeout) {
+ RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
+ request.setDecommissionType(decommissionType);
+ request.setDecommissionTimeout(timeout);
+ return request;
+ }
+
/**
* Set the DecommissionType
*
@@ -56,4 +66,18 @@ public abstract class RefreshNodesRequest {
* @return decommissionType
*/
public abstract DecommissionType getDecommissionType();
-}
+
+ /**
+ * Set the DecommissionTimeout.
+ *
+ * @param timeout graceful decommission timeout in seconds
+ */
+ public abstract void setDecommissionTimeout(Integer timeout);
+
+ /**
+ * Get the DecommissionTimeout.
+ *
+ * @return decommissionTimeout
+ */
+ public abstract Integer getDecommissionTimeout();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index eaf658f..b9f30db 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -37,6 +37,7 @@ message RefreshQueuesResponseProto {
message RefreshNodesRequestProto {
optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
+ optional int32 decommissionTimeout = 2;
}
message RefreshNodesResponseProto {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index 4aa3a14..fcb9b74 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -99,10 +99,10 @@ public class RMAdminCLI extends HAAdmin {
"properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file."))
.put("-refreshNodes",
- new UsageInfo("[-g [timeout in seconds] -client|server]",
+ new UsageInfo("[-g|graceful [timeout in seconds]
-client|server]",
"Refresh the hosts information at the ResourceManager. Here "
- + "[-g [timeout in seconds] -client|server] is optional, if we "
- + "specify the timeout then ResourceManager will wait for "
+ + "[-g|graceful [timeout in seconds] -client|server] is
optional,"
+ + " if we specify the timeout then ResourceManager will wait for
"
+ "timeout before marking the NodeManager as decommissioned."
+ " The -client|server indicates if the timeout tracking should"
+ " be handled by the client or the ResourceManager. The client"
@@ -234,21 +234,23 @@ public class RMAdminCLI extends HAAdmin {
summary.append("rmadmin is the command to execute YARN administrative " +
"commands.\n");
summary.append("The full syntax is: \n\n" +
- "yarn rmadmin" +
- " [-refreshQueues]" +
- " [-refreshNodes [-g [timeout in seconds] -client|server]]" +
- " [-refreshNodesResources]" +
- " [-refreshSuperUserGroupsConfiguration]" +
- " [-refreshUserToGroupsMappings]" +
- " [-refreshAdminAcls]" +
- " [-refreshServiceAcl]" +
- " [-getGroup [username]]" +
- " [-addToClusterNodeLabels <\"label1(exclusive=true),"
- + "label2(exclusive=false),label3\">]" +
- " [-removeFromClusterNodeLabels <label1,label2,label3>]" +
- " [-replaceLabelsOnNode <\"node1[:port]=label1,label2
node2[:port]=label1\">]" +
- " [-directlyAccessNodeLabelStore]" +
- " [-updateNodeResource [NodeID] [MemSize] [vCores]
([OvercommitTimeout])");
+ "yarn rmadmin" +
+ " [-refreshQueues]" +
+ " [-refreshNodes [-g|graceful [timeout in seconds] -client|server]]" +
+ " [-refreshNodesResources]" +
+ " [-refreshSuperUserGroupsConfiguration]" +
+ " [-refreshUserToGroupsMappings]" +
+ " [-refreshAdminAcls]" +
+ " [-refreshServiceAcl]" +
+ " [-getGroup [username]]" +
+ " [-addToClusterNodeLabels <\"label1(exclusive=true),"
+ + "label2(exclusive=false),label3\">]" +
+ " [-removeFromClusterNodeLabels <label1,label2,label3>]" +
+ " [-replaceLabelsOnNode <\"node1[:port]=label1,label2" +
+ " node2[:port]=label1\">]" +
+ " [-directlyAccessNodeLabelStore]" +
+ " [-updateNodeResource [NodeID] [MemSize] [vCores]" +
+ " ([OvercommitTimeout])");
if (isHAEnabled) {
appendHAUsage(summary);
}
@@ -309,33 +311,40 @@ public class RMAdminCLI extends HAAdmin {
return 0;
}
- private int refreshNodes() throws IOException, YarnException {
+ private int refreshNodes(boolean graceful) throws IOException, YarnException
{
// Refresh the nodes
ResourceManagerAdministrationProtocol adminProtocol =
createAdminProtocol();
- RefreshNodesRequest request = RefreshNodesRequest
- .newInstance(DecommissionType.NORMAL);
+ RefreshNodesRequest request = RefreshNodesRequest.newInstance(
+ graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL);
adminProtocol.refreshNodes(request);
return 0;
}
- private int refreshNodes(long timeout, String trackingMode)
+ private int refreshNodes(int timeout, String trackingMode)
throws IOException, YarnException {
- if (!"client".equals(trackingMode)) {
- throw new UnsupportedOperationException(
- "Only client tracking mode is currently supported.");
- }
+ boolean serverTracking = !"client".equals(trackingMode);
// Graceful decommissioning with timeout
ResourceManagerAdministrationProtocol adminProtocol =
createAdminProtocol();
RefreshNodesRequest gracefulRequest = RefreshNodesRequest
- .newInstance(DecommissionType.GRACEFUL);
+ .newInstance(DecommissionType.GRACEFUL, timeout);
adminProtocol.refreshNodes(gracefulRequest);
+ if (serverTracking) {
+ return 0;
+ }
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest =
recordFactory
.newRecordInstance(CheckForDecommissioningNodesRequest.class);
long waitingTime;
boolean nodesDecommissioning = true;
+ // As RM enforces timeout automatically, client usually don't need
+ // to forcefully decommission nodes upon timeout.
+ // Here we let the client waits a small additional seconds so to avoid
+ // unnecessary double decommission.
+ final int gracePeriod = 5;
// timeout=-1 means wait for all the nodes to be gracefully
// decommissioned
- for (waitingTime = 0; waitingTime < timeout || timeout == -1;
waitingTime++) {
+ for (waitingTime = 0;
+ timeout == -1 || (timeout >= 0 && waitingTime < timeout + gracePeriod);
+ waitingTime++) {
// wait for one second to check nodes decommissioning status
try {
Thread.sleep(1000);
@@ -380,6 +389,10 @@ public class RMAdminCLI extends HAAdmin {
return 0;
}
+ private int refreshNodes() throws IOException, YarnException {
+ return refreshNodes(false);
+ }
+
private int refreshUserToGroupsMappings() throws IOException,
YarnException {
// Refresh the user-to-groups mappings
@@ -725,33 +738,12 @@ public class RMAdminCLI extends HAAdmin {
return exitCode;
}
}
-
+
try {
if ("-refreshQueues".equals(cmd)) {
exitCode = refreshQueues();
} else if ("-refreshNodes".equals(cmd)) {
- if (args.length == 1) {
- exitCode = refreshNodes();
- } else if (args.length == 3 || args.length == 4) {
- // if the graceful timeout specified
- if ("-g".equals(args[1])) {
- long timeout = -1;
- String trackingMode;
- if (args.length == 4) {
- timeout = validateTimeout(args[2]);
- trackingMode = validateTrackingMode(args[3]);
- } else {
- trackingMode = validateTrackingMode(args[2]);
- }
- exitCode = refreshNodes(timeout, trackingMode);
- } else {
- printUsage(cmd, isHAEnabled);
- return -1;
- }
- } else {
- printUsage(cmd, isHAEnabled);
- return -1;
- }
+ exitCode = handleRefreshNodes(args, cmd, isHAEnabled);
} else if ("-refreshNodesResources".equals(cmd)) {
exitCode = refreshNodesResources();
} else if ("-refreshUserToGroupsMappings".equals(cmd)) {
@@ -768,22 +760,7 @@ public class RMAdminCLI extends HAAdmin {
String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames);
} else if ("-updateNodeResource".equals(cmd)) {
- if (args.length < 4 || args.length > 5) {
- System.err.println("Number of parameters specified for " +
- "updateNodeResource is wrong.");
- printUsage(cmd, isHAEnabled);
- exitCode = -1;
- } else {
- String nodeID = args[i++];
- String memSize = args[i++];
- String cores = args[i++];
- int overCommitTimeout =
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
- if (i == args.length - 1) {
- overCommitTimeout = Integer.parseInt(args[i]);
- }
- exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize),
- Integer.parseInt(cores), overCommitTimeout);
- }
+ exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled);
} else if ("-addToClusterNodeLabels".equals(cmd)) {
if (i >= args.length) {
System.err.println(NO_LABEL_ERR_MSG);
@@ -843,10 +820,59 @@ public class RMAdminCLI extends HAAdmin {
return exitCode;
}
- private long validateTimeout(String strTimeout) {
- long timeout;
+ // A helper method to reduce the number of lines of run()
+ private int handleRefreshNodes(String[] args, String cmd, boolean
isHAEnabled)
+ throws IOException, YarnException {
+ if (args.length == 1) {
+ return refreshNodes();
+ } else if (args.length == 3 || args.length == 4) {
+ // if the graceful timeout specified
+ if ("-g".equals(args[1]) || "-graceful".equals(args[1])) {
+ int timeout = -1;
+ String trackingMode;
+ if (args.length == 4) {
+ timeout = validateTimeout(args[2]);
+ trackingMode = validateTrackingMode(args[3]);
+ } else {
+ trackingMode = validateTrackingMode(args[2]);
+ }
+ return refreshNodes(timeout, trackingMode);
+ } else {
+ printUsage(cmd, isHAEnabled);
+ return -1;
+ }
+ } else {
+ printUsage(cmd, isHAEnabled);
+ return -1;
+ }
+ }
+
+ private int handleUpdateNodeResource(
+ String[] args, String cmd, boolean isHAEnabled)
+ throws NumberFormatException, IOException, YarnException {
+ int i = 1;
+ if (args.length < 4 || args.length > 5) {
+ System.err.println("Number of parameters specified for " +
+ "updateNodeResource is wrong.");
+ printUsage(cmd, isHAEnabled);
+ return -1;
+ } else {
+ String nodeID = args[i++];
+ String memSize = args[i++];
+ String cores = args[i++];
+ int overCommitTimeout =
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
+ if (i == args.length - 1) {
+ overCommitTimeout = Integer.parseInt(args[i]);
+ }
+ return updateNodeResource(nodeID, Integer.parseInt(memSize),
+ Integer.parseInt(cores), overCommitTimeout);
+ }
+ }
+
+ private int validateTimeout(String strTimeout) {
+ int timeout;
try {
- timeout = Long.parseLong(strTimeout);
+ timeout = Integer.parseInt(strTimeout);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
index d3161ba..60c7eac 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
@@ -267,7 +267,7 @@ public class TestRMAdminCLI {
CheckForDecommissioningNodesRequest.class))).thenReturn(response);
assertEquals(0, rmAdminCLI.run(args));
verify(admin).refreshNodes(
- RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+ RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, 1));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@@ -327,7 +327,7 @@ public class TestRMAdminCLI {
});
assertEquals(0, rmAdminCLI.run(args));
verify(admin, atLeastOnce()).refreshNodes(
- RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+ RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, -1));
verify(admin, never()).refreshNodes(
RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
}
@@ -346,10 +346,6 @@ public class TestRMAdminCLI {
String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
- // server tracking mode
- String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
- assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
-
// invalid tracking mode
String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
@@ -465,8 +461,9 @@ public class TestRMAdminCLI {
assertTrue(dataOut
.toString()
.contains(
- "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
- "seconds] -client|server]] [-refreshNodesResources] [-refresh" +
+ "yarn rmadmin [-refreshQueues] [-refreshNodes "+
+ "[-g|graceful [timeout in seconds] -client|server]] " +
+ "[-refreshNodesResources] [-refresh" +
"SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
"[username]] [-addToClusterNodeLabels " +
@@ -485,7 +482,8 @@ public class TestRMAdminCLI {
assertTrue(dataOut
.toString()
.contains(
- "-refreshNodes [-g [timeout in seconds] -client|server]: " +
+ "-refreshNodes [-g|graceful [timeout in seconds]" +
+ " -client|server]: " +
"Refresh the hosts information at the ResourceManager."));
assertTrue(dataOut
.toString()
@@ -518,8 +516,8 @@ public class TestRMAdminCLI {
testError(new String[] { "-help", "-refreshQueues" },
"Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodes" },
- "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
- "-client|server]]", dataErr, 0);
+ "Usage: yarn rmadmin [-refreshNodes [-g|graceful " +
+ "[timeout in seconds] -client|server]]", dataErr, 0);
testError(new String[] { "-help", "-refreshNodesResources" },
"Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@@ -558,8 +556,8 @@ public class TestRMAdminCLI {
assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
oldOutPrintStream.println(dataOut);
String expectedHelpMsg =
- "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
- + "seconds] -client|server]] "
+ "yarn rmadmin [-refreshQueues] [-refreshNodes [-g|graceful "
+ + "[timeout in seconds] -client|server]] "
+ "[-refreshNodesResources]
[-refreshSuperUserGroupsConfiguration] "
+ "[-refreshUserToGroupsMappings] "
+ "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
index 05f3230..c03a569 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
@@ -31,7 +31,6 @@ import com.google.protobuf.TextFormat;
@Private
@Unstable
public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
-
RefreshNodesRequestProto proto =
RefreshNodesRequestProto.getDefaultInstance();
RefreshNodesRequestProto.Builder builder = null;
boolean viaProto = false;
@@ -108,6 +107,22 @@ public class RefreshNodesRequestPBImpl extends
RefreshNodesRequest {
return convertFromProtoFormat(p.getDecommissionType());
}
+ @Override
+ public synchronized void setDecommissionTimeout(Integer timeout) {
+ maybeInitBuilder();
+ if (timeout != null) {
+ builder.setDecommissionTimeout(timeout);
+ } else {
+ builder.clearDecommissionTimeout();
+ }
+ }
+
+ @Override
+ public synchronized Integer getDecommissionTimeout() {
+ RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
+ }
+
private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
return DecommissionType.valueOf(p.name());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e77e990..e956507 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2586,6 +2586,24 @@
</property>
<property>
+ <description>
+ Timeout in seconds for YARN node graceful decommission.
+ This is the maximal time to wait for running containers and applications
to complete
+ before transition a DECOMMISSIONING node into DECOMMISSIONED.
+ </description>
+
<name>yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs</name>
+ <value>3600</value>
+ </property>
+
+ <property>
+ <description>
+ Timeout in seconds of DecommissioningNodesWatcher internal polling.
+ </description>
+
<name>yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs</name>
+ <value>20</value>
+ </property>
+
+ <property>
<description>The Node Label script to run. Script output Line starting with
"NODE_PARTITION:" will be considered as Node Label Partition. In case of
multiple lines have this pattern, then last one will be considered
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 2ec03aa..db55264 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -447,7 +447,8 @@ public class AdminService extends CompositeService
implements
rmContext.getNodesListManager().refreshNodes(conf);
break;
case GRACEFUL:
- rmContext.getNodesListManager().refreshNodesGracefully(conf);
+ rmContext.getNodesListManager().refreshNodesGracefully(
+ conf, request.getDecommissionTimeout());
break;
case FORCEFUL:
rmContext.getNodesListManager().refreshNodesForcefully();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
new file mode 100644
index 0000000..376b503
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+/**
+ * DecommissioningNodesWatcher is used by ResourceTrackerService to track
+ * DECOMMISSIONING nodes to decide when, after all running containers on
+ * the node have completed, will be transitioned into DECOMMISSIONED state
+ * (NodeManager will be told to shutdown).
+ * Under MR application, a node, after completes all its containers,
+ * may still serve it map output data during the duration of the application
+ * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING
+ * nodes until all involved applications complete. It could be however
+ * undesirable under long-running applications scenario where a bunch
+ * of "idle" nodes would stay around for long period of time.
+ *
+ * DecommissioningNodesWatcher balance such concern with a timeout policy ---
+ * a DECOMMISSIONING node will be DECOMMISSIONED no later than
+ * DECOMMISSIONING_TIMEOUT regardless of running containers or applications.
+ *
+ * To be efficient, DecommissioningNodesWatcher skip tracking application
+ * containers on a particular node before the node is in DECOMMISSIONING state.
+ * It only tracks containers once the node is in DECOMMISSIONING state.
+ * DecommissioningNodesWatcher basically is no cost when no node is
+ * DECOMMISSIONING. This sacrifices the possibility that the node once
+ * host containers of an application that is still running
+ * (the affected map tasks will be rescheduled).
+ */
+public class DecommissioningNodesWatcher {
+ private static final Log LOG =
+ LogFactory.getLog(DecommissioningNodesWatcher.class);
+
+ private final RMContext rmContext;
+
+ // Default timeout value in mills.
+ // Negative value indicates no timeout. 0 means immediate.
+ private long defaultTimeoutMs =
+ 1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
+
+ // Once a RMNode is observed in DECOMMISSIONING state,
+ // All its ContainerStatus update are tracked inside DecomNodeContext.
+ class DecommissioningNodeContext {
+ private final NodeId nodeId;
+
+ // Last known NodeState.
+ private NodeState nodeState;
+
+ // The moment node is observed in DECOMMISSIONING state.
+ private final long decommissioningStartTime;
+
+ private long lastContainerFinishTime;
+
+ // number of running containers at the moment.
+ private int numActiveContainers;
+
+ // All applications run on the node at or after decommissioningStartTime.
+ private Set<ApplicationId> appIds;
+
+ // First moment the node is observed in DECOMMISSIONED state.
+ private long decommissionedTime;
+
+ // Timeout in millis for this decommissioning node.
+ // This value could be dynamically updated with new value from RMNode.
+ private long timeoutMs;
+
+ private long lastUpdateTime;
+
+ public DecommissioningNodeContext(NodeId nodeId) {
+ this.nodeId = nodeId;
+ this.appIds = new HashSet<ApplicationId>();
+ this.decommissioningStartTime = mclock.getTime();
+ this.timeoutMs = defaultTimeoutMs;
+ }
+
+ void updateTimeout(Integer timeoutSec) {
+ this.timeoutMs = (timeoutSec == null)?
+ defaultTimeoutMs : (1000L * timeoutSec);
+ }
+ }
+
+ // All DECOMMISSIONING nodes to track.
+ private HashMap<NodeId, DecommissioningNodeContext> decomNodes =
+ new HashMap<NodeId, DecommissioningNodeContext>();
+
+ private Timer pollTimer;
+ private MonotonicClock mclock;
+
+ public DecommissioningNodesWatcher(RMContext rmContext) {
+ this.rmContext = rmContext;
+ pollTimer = new Timer(true);
+ mclock = new MonotonicClock();
+ }
+
+ public void init(Configuration conf) {
+ readDecommissioningTimeout(conf);
+ int v = conf.getInt(
+ YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
+ YarnConfiguration
+ .DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL);
+ pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v));
+ }
+
+ /**
+ * Update rmNode decommissioning status based on NodeStatus.
+ * @param rmNode The node
+ * @param remoteNodeStatus latest NodeStatus
+ */
+ public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) {
+ DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID());
+ long now = mclock.getTime();
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ if (context == null) {
+ return;
+ }
+ context.nodeState = rmNode.getState();
+ // keep DECOMMISSIONED node for a while for status log, so that such
+ // host will appear as DECOMMISSIONED instead of quietly disappears.
+ if (context.decommissionedTime == 0) {
+ context.decommissionedTime = now;
+ } else if (now - context.decommissionedTime > 60000L) {
+ decomNodes.remove(rmNode.getNodeID());
+ }
+ } else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+ if (context == null) {
+ context = new DecommissioningNodeContext(rmNode.getNodeID());
+ decomNodes.put(rmNode.getNodeID(), context);
+ context.nodeState = rmNode.getState();
+ context.decommissionedTime = 0;
+ }
+ context.updateTimeout(rmNode.getDecommissioningTimeout());
+ context.lastUpdateTime = now;
+
+ if (remoteNodeStatus.getKeepAliveApplications() != null) {
+ context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications());
+ }
+
+ // Count number of active containers.
+ int numActiveContainers = 0;
+ for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) {
+ ContainerState newState = cs.getState();
+ if (newState == ContainerState.RUNNING ||
+ newState == ContainerState.NEW) {
+ numActiveContainers++;
+ }
+ context.numActiveContainers = numActiveContainers;
+ ApplicationId aid = cs.getContainerId()
+ .getApplicationAttemptId().getApplicationId();
+ if (!context.appIds.contains(aid)) {
+ context.appIds.add(aid);
+ }
+ }
+
+ context.numActiveContainers = numActiveContainers;
+
+ // maintain lastContainerFinishTime.
+ if (context.numActiveContainers == 0 &&
+ context.lastContainerFinishTime == 0) {
+ context.lastContainerFinishTime = now;
+ }
+ } else {
+ // remove node in other states
+ if (context != null) {
+ decomNodes.remove(rmNode.getNodeID());
+ }
+ }
+ }
+
+ public synchronized void remove(NodeId nodeId) {
+ DecommissioningNodeContext context = decomNodes.get(nodeId);
+ if (context != null) {
+ LOG.info("remove " + nodeId + " in " + context.nodeState);
+ decomNodes.remove(nodeId);
+ }
+ }
+
+ /**
+ * Status about a specific decommissioning node.
+ *
+ */
+ public enum DecommissioningNodeStatus {
+ // Node is not in DECOMMISSIONING state.
+ NONE,
+
+ // wait for running containers to complete
+ WAIT_CONTAINER,
+
+ // wait for running application to complete (after all containers
complete);
+ WAIT_APP,
+
+ // Timeout waiting for either containers or applications to complete.
+ TIMEOUT,
+
+ // nothing to wait, ready to be decommissioned
+ READY,
+
+ // The node has already been decommissioned
+ DECOMMISSIONED,
+ }
+
+ public boolean checkReadyToBeDecommissioned(NodeId nodeId) {
+ DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId);
+ return (s == DecommissioningNodeStatus.READY ||
+ s == DecommissioningNodeStatus.TIMEOUT);
+ }
+
+ public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) {
+ DecommissioningNodeContext context = decomNodes.get(nodeId);
+ if (context == null) {
+ return DecommissioningNodeStatus.NONE;
+ }
+
+ if (context.nodeState == NodeState.DECOMMISSIONED) {
+ return DecommissioningNodeStatus.DECOMMISSIONED;
+ }
+
+ long waitTime = mclock.getTime() - context.decommissioningStartTime;
+ if (context.numActiveContainers > 0) {
+ return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
+ DecommissioningNodeStatus.WAIT_CONTAINER :
+ DecommissioningNodeStatus.TIMEOUT;
+ }
+
+ removeCompletedApps(context);
+ if (context.appIds.size() == 0) {
+ return DecommissioningNodeStatus.READY;
+ } else {
+ return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
+ DecommissioningNodeStatus.WAIT_APP :
+ DecommissioningNodeStatus.TIMEOUT;
+ }
+ }
+
+ /**
+ * PollTimerTask periodically:
+ * 1. log status of all DECOMMISSIONING nodes;
+ * 2. identify and taken care of stale DECOMMISSIONING nodes
+ * (for example, node already terminated).
+ */
+ class PollTimerTask extends TimerTask {
+ private final RMContext rmContext;
+
+ public PollTimerTask(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ public void run() {
+ logDecommissioningNodesStatus();
+ long now = mclock.getTime();
+ Set<NodeId> staleNodes = new HashSet<NodeId>();
+
+ for (Iterator<Map.Entry<NodeId, DecommissioningNodeContext>> it =
+ decomNodes.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<NodeId, DecommissioningNodeContext> e = it.next();
+ DecommissioningNodeContext d = e.getValue();
+ // Skip node recently updated (NM usually updates every second).
+ if (now - d.lastUpdateTime < 5000L) {
+ continue;
+ }
+ // Remove stale non-DECOMMISSIONING node
+ if (d.nodeState != NodeState.DECOMMISSIONING) {
+ LOG.debug("remove " + d.nodeState + " " + d.nodeId);
+ it.remove();
+ continue;
+ } else if (now - d.lastUpdateTime > 60000L) {
+ // Node DECOMMISSIONED could become stale, remove as necessary.
+ RMNode rmNode = getRmNode(d.nodeId);
+ if (rmNode != null &&
+ rmNode.getState() == NodeState.DECOMMISSIONED) {
+ LOG.debug("remove " + rmNode.getState() + " " + d.nodeId);
+ it.remove();
+ continue;
+ }
+ }
+ if (d.timeoutMs >= 0 &&
+ d.decommissioningStartTime + d.timeoutMs < now) {
+ staleNodes.add(d.nodeId);
+ LOG.debug("Identified stale and timeout node " + d.nodeId);
+ }
+ }
+
+ for (NodeId nodeId : staleNodes) {
+ RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+ if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) {
+ remove(nodeId);
+ continue;
+ }
+ if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+ checkReadyToBeDecommissioned(rmNode.getNodeID())) {
+ LOG.info("DECOMMISSIONING " + nodeId + " timeout");
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ }
+ }
+ }
+ }
+
+ private RMNode getRmNode(NodeId nodeId) {
+ RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+ if (rmNode == null) {
+ rmNode = this.rmContext.getInactiveRMNodes().get(nodeId);
+ }
+ return rmNode;
+ }
+
+ private void removeCompletedApps(DecommissioningNodeContext context) {
+ Iterator<ApplicationId> it = context.appIds.iterator();
+ while (it.hasNext()) {
+ ApplicationId appId = it.next();
+ RMApp rmApp = rmContext.getRMApps().get(appId);
+ if (rmApp == null) {
+ LOG.debug("Consider non-existing app " + appId + " as completed");
+ it.remove();
+ continue;
+ }
+ if (rmApp.getState() == RMAppState.FINISHED ||
+ rmApp.getState() == RMAppState.FAILED ||
+ rmApp.getState() == RMAppState.KILLED) {
+ LOG.debug("Remove " + rmApp.getState() + " app " + appId);
+ it.remove();
+ }
+ }
+ }
+
+ // Time in second to be decommissioned.
+ private int getTimeoutInSec(DecommissioningNodeContext context) {
+ if (context.nodeState == NodeState.DECOMMISSIONED) {
+ return 0;
+ } else if (context.nodeState != NodeState.DECOMMISSIONING) {
+ return -1;
+ }
+ if (context.appIds.size() == 0 && context.numActiveContainers == 0) {
+ return 0;
+ }
+ // negative timeout value means no timeout (infinite timeout).
+ if (context.timeoutMs < 0) {
+ return -1;
+ }
+
+ long now = mclock.getTime();
+ long timeout = context.decommissioningStartTime + context.timeoutMs - now;
+ return Math.max(0, (int)(timeout / 1000));
+ }
+
+ private void logDecommissioningNodesStatus() {
+ if (!LOG.isDebugEnabled() || decomNodes.size() == 0) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ long now = mclock.getTime();
+ for (DecommissioningNodeContext d : decomNodes.values()) {
+ DecommissioningNodeStatus s = checkDecommissioningStatus(d.nodeId);
+ sb.append(String.format(
+ "%n %-34s %4ds fresh:%3ds containers:%2d %14s",
+ d.nodeId.getHost(),
+ (now - d.decommissioningStartTime) / 1000,
+ (now - d.lastUpdateTime) / 1000,
+ d.numActiveContainers,
+ s));
+ if (s == DecommissioningNodeStatus.WAIT_APP ||
+ s == DecommissioningNodeStatus.WAIT_CONTAINER) {
+ sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d)));
+ }
+ for (ApplicationId aid : d.appIds) {
+ sb.append("\n " + aid);
+ RMApp rmApp = rmContext.getRMApps().get(aid);
+ if (rmApp != null) {
+ sb.append(String.format(
+ " %s %9s %5.2f%% %5ds",
+ rmApp.getState(),
+ (rmApp.getApplicationType() == null)?
+ "" : rmApp.getApplicationType(),
+ 100.0 * rmApp.getProgress(),
+ (mclock.getTime() - rmApp.getStartTime()) / 1000));
+ }
+ }
+ }
+ LOG.info("Decommissioning Nodes: " + sb.toString());
+ }
+
+ // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
+ // This enables DecommissioningNodesWatcher to pick up new value
+ // without ResourceManager restart.
+ private void readDecommissioningTimeout(Configuration conf) {
+ try {
+ if (conf == null) {
+ conf = new YarnConfiguration();
+ }
+ int v = conf.getInt(
+ YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+ if (defaultTimeoutMs != 1000L * v) {
+ defaultTimeoutMs = 1000L * v;
+ LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
+ }
+ } catch (Exception e) {
+ LOG.info("Error readDecommissioningTimeout ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 7937383..99413bc 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -19,14 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
-import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +41,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -47,14 +52,15 @@ import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import com.google.common.annotations.VisibleForTesting;
+
@SuppressWarnings("unchecked")
public class NodesListManager extends CompositeService implements
EventHandler<NodesListManagerEvent> {
@@ -178,10 +184,11 @@ public class NodesListManager extends CompositeService
implements
if (!LOG.isDebugEnabled()) {
return;
}
-
- LOG.debug("hostsReader: in=" +
conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+
+ LOG.debug("hostsReader: in=" +
+ conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" +
- conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
Set<String> hostsList = new HashSet<String>();
@@ -196,23 +203,19 @@ public class NodesListManager extends CompositeService
implements
}
}
- public void refreshNodes(Configuration yarnConf) throws IOException,
- YarnException {
- refreshHostsReader(yarnConf);
+ public void refreshNodes(Configuration yarnConf)
+ throws IOException, YarnException {
+ refreshNodes(yarnConf, false);
+ }
- for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
- if (!isValidNode(nodeId.getHost())) {
- RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
- RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, nodeEventType));
- }
- }
- updateInactiveNodes();
+ public void refreshNodes(Configuration yarnConf, boolean graceful)
+ throws IOException, YarnException {
+ refreshHostsReader(yarnConf, graceful, null);
}
- private void refreshHostsReader(Configuration yarnConf) throws IOException,
- YarnException {
+ private void refreshHostsReader(
+ Configuration yarnConf, boolean graceful, Integer timeout)
+ throws IOException, YarnException {
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
}
@@ -222,8 +225,16 @@ public class NodesListManager extends CompositeService
implements
excludesFile =
yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
+ LOG.info("refreshNodes excludesFile " + excludesFile);
hostsReader.refresh(includesFile, excludesFile);
printConfiguredHosts();
+
+ LOG.info("hostsReader include:{" +
+ StringUtils.join(",", hostsReader.getHosts()) +
+ "} exclude:{" +
+ StringUtils.join(",", hostsReader.getExcludedHosts()) + "}");
+
+ handleExcludeNodeList(graceful, timeout);
}
private void setDecomissionedNMs() {
@@ -237,6 +248,86 @@ public class NodesListManager extends CompositeService
implements
}
}
+ // Handle excluded nodes based on following rules:
+ // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded;
+ // Gracefully decommission excluded nodes that are not already
+ // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
+ // that are already DECOMMISSIONED or DECOMMISSIONING.
+ private void handleExcludeNodeList(boolean graceful, Integer timeout) {
+ // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
+ List<RMNode> nodesToRecom = new ArrayList<RMNode>();
+
+ // Nodes need to be decommissioned (graceful or forceful);
+ List<RMNode> nodesToDecom = new ArrayList<RMNode>();
+
+ Set<String> includes = new HashSet<String>();
+ Map<String, Integer> excludes = new HashMap<String, Integer>();
+ hostsReader.getHostDetails(includes, excludes);
+
+ for (RMNode n : this.rmContext.getRMNodes().values()) {
+ NodeState s = n.getState();
+ // An invalid node (either due to explicit exclude or not include)
+ // should be excluded.
+ boolean isExcluded = !isValidNode(
+ n.getHostName(), includes, excludes.keySet());
+ String nodeStr = "node " + n.getNodeID() + " with state " + s;
+ if (!isExcluded) {
+ // Note that no action is needed for DECOMMISSIONED node.
+ if (s == NodeState.DECOMMISSIONING) {
+ LOG.info("Recommission " + nodeStr);
+ nodesToRecom.add(n);
+ }
+ // Otherwise no-action needed.
+ } else {
+ // exclude is true.
+ if (graceful) {
+ // Use per node timeout if exist otherwise the request timeout.
+ Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
+ excludes.get(n.getHostName()) : timeout;
+ if (s != NodeState.DECOMMISSIONED &&
+ s != NodeState.DECOMMISSIONING) {
+ LOG.info("Gracefully decommission " + nodeStr);
+ nodesToDecom.add(n);
+ } else if (s == NodeState.DECOMMISSIONING &&
+ !Objects.equals(n.getDecommissioningTimeout(),
+ timeoutToUse)) {
+ LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
+ nodesToDecom.add(n);
+ } else {
+ LOG.info("No action for " + nodeStr);
+ }
+ } else {
+ if (s != NodeState.DECOMMISSIONED) {
+ LOG.info("Forcefully decommission " + nodeStr);
+ nodesToDecom.add(n);
+ }
+ }
+ }
+ }
+
+ for (RMNode n : nodesToRecom) {
+ RMNodeEvent e = new RMNodeEvent(
+ n.getNodeID(), RMNodeEventType.RECOMMISSION);
+ this.rmContext.getDispatcher().getEventHandler().handle(e);
+ }
+
+ for (RMNode n : nodesToDecom) {
+ RMNodeEvent e;
+ if (graceful) {
+ Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
+ excludes.get(n.getHostName()) : timeout;
+ e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse);
+ } else {
+ RMNodeEventType eventType = isUntrackedNode(n.getHostName())?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
+ e = new RMNodeEvent(n.getNodeID(), eventType);
+ }
+ this.rmContext.getDispatcher().getEventHandler().handle(e);
+ }
+
+ updateInactiveNodes();
+ }
+
@VisibleForTesting
public int getNodeRemovalCheckInterval() {
return nodeRemovalCheckInterval;
@@ -360,11 +451,15 @@ public class NodesListManager extends CompositeService
implements
}
public boolean isValidNode(String hostName) {
- String ip = resolver.resolve(hostName);
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
+ return isValidNode(hostName, hostsList, excludeList);
+ }
+ private boolean isValidNode(
+ String hostName, Set<String> hostsList, Set<String> excludeList) {
+ String ip = resolver.resolve(hostName);
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
@@ -478,29 +573,14 @@ public class NodesListManager extends CompositeService
implements
/**
* Refresh the nodes gracefully
*
- * @param conf
+ * @param yarnConf
+ * @param timeout decommission timeout, null means default timeout.
* @throws IOException
* @throws YarnException
*/
- public void refreshNodesGracefully(Configuration conf) throws IOException,
- YarnException {
- refreshHostsReader(conf);
- for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
- NodeId nodeId = entry.getKey();
- if (!isValidNode(nodeId.getHost())) {
- RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
- RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, nodeEventType));
- } else {
- // Recommissioning the nodes
- if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION));
- }
- }
- }
- updateInactiveNodes();
+ public void refreshNodesGracefully(Configuration yarnConf, Integer timeout)
+ throws IOException, YarnException {
+ refreshHostsReader(yarnConf, true, timeout);
}
/**
@@ -596,4 +676,4 @@ public class NodesListManager extends CompositeService
implements
this.host = hst;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 9b9b02e..5e9827a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -70,7 +70,7 @@ public class RMServerUtils {
public static List<RMNode> queryRMNodes(RMContext context,
EnumSet<NodeState> acceptedStates) {
- // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY
+ // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or
DECOMMISSIONING.
ArrayList<RMNode> results = new ArrayList<RMNode>();
if (acceptedStates.contains(NodeState.NEW) ||
acceptedStates.contains(NodeState.RUNNING) ||
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 9d480f3..51fc0bd 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -113,6 +113,8 @@ public class ResourceTrackerService extends AbstractService
implements
private int minAllocMb;
private int minAllocVcores;
+ private DecommissioningNodesWatcher decommissioningWatcher;
+
private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf;
private DynamicResourceConfiguration drConf;
@@ -131,6 +133,7 @@ public class ResourceTrackerService extends AbstractService
implements
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
+ this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext);
}
@Override
@@ -170,6 +173,7 @@ public class ResourceTrackerService extends AbstractService
implements
}
loadDynamicResourceConfiguration(conf);
+ decommissioningWatcher.init(conf);
super.serviceInit(conf);
}
@@ -494,6 +498,7 @@ public class ResourceTrackerService extends AbstractService
implements
// Send ping
this.nmLivelinessMonitor.receivedPing(nodeId);
+ this.decommissioningWatcher.update(rmNode, remoteNodeStatus);
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse =
rmNode.getLastNodeHeartBeatResponse();
@@ -526,6 +531,20 @@ public class ResourceTrackerService extends
AbstractService implements
updateAppCollectorsMap(request);
}
+ // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED.
+ if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+ decommissioningWatcher.checkReadyToBeDecommissioned(
+ rmNode.getNodeID())) {
+ String message = "DECOMMISSIONING " + nodeId +
+ " is ready to be decommissioned";
+ LOG.info(message);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ this.nmLivelinessMonitor.unregister(nodeId);
+ return YarnServerBuilderUtils.newNodeHeartbeatResponse(
+ NodeAction.SHUTDOWN, message);
+ }
+
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 3a9cf54..10e2afa 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -175,4 +175,10 @@ public interface RMNode {
long getUntrackedTimeStamp();
void setUntrackedTimeStamp(long timeStamp);
+ /*
+ * Optional decommissioning timeout in second
+ * (null indicates default timeout).
+ * @return the decommissioning timeout in second.
+ */
+ Integer getDecommissioningTimeout();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
new file mode 100644
index 0000000..9955e9e
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * RMNode Decommissioning Event.
+ *
+ */
+public class RMNodeDecommissioningEvent extends RMNodeEvent {
+ // Optional decommissioning timeout in second.
+ private final Integer decommissioningTimeout;
+
+ // Create instance with optional timeout
+ // (timeout could be null which means use default).
+ public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) {
+ super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION);
+ this.decommissioningTimeout = timeout;
+ }
+
+ public Integer getDecommissioningTimeout() {
+ return this.decommissioningTimeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index a3a6b30..d1ccecb 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -124,6 +125,7 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
private String healthReport;
private long lastHealthReportTime;
private String nodeManagerVersion;
+ private Integer decommissioningTimeout;
private long timeStamp;
/* Aggregated resource utilization for the containers. */
@@ -179,7 +181,6 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
NodeState,
RMNodeEventType,
RMNodeEvent>(NodeState.NEW)
-
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
@@ -265,6 +266,9 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
.addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
RMNodeEventType.REBOOTING,
new DeactivateNodeTransition(NodeState.REBOOTED))
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
@@ -633,7 +637,7 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Invalid event " + event.getType() +
- " on Node " + this.nodeId);
+ " on Node " + this.nodeId + " oldState " + oldState);
}
if (oldState != getState()) {
LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
@@ -666,6 +670,9 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
case SHUTDOWN:
metrics.decrNumShutdownNMs();
break;
+ case DECOMMISSIONING:
+ metrics.decrDecommissioningNMs();
+ break;
default:
LOG.debug("Unexpected previous node state");
}
@@ -712,6 +719,9 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
case DECOMMISSIONING:
metrics.decrDecommissioningNMs();
break;
+ case DECOMMISSIONED:
+ metrics.decrDecommisionedNMs();
+ break;
case UNHEALTHY:
metrics.decrNumUnhealthyNMs();
break;
@@ -1087,9 +1097,26 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ Integer timeout = null;
+ if (RMNodeDecommissioningEvent.class.isInstance(event)) {
+ RMNodeDecommissioningEvent e = ((RMNodeDecommissioningEvent) event);
+ timeout = e.getDecommissioningTimeout();
+ }
+ // Pick up possible updates on decommissioningTimeout.
+ if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+ if (!Objects.equals(rmNode.getDecommissioningTimeout(), timeout)) {
+ LOG.info("Update " + rmNode.getNodeID() +
+ " DecommissioningTimeout to be " + timeout);
+ rmNode.decommissioningTimeout = timeout;
+ } else {
+ LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING");
+ }
+ return;
+ }
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
// Update NM metrics during graceful decommissioning.
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
+ rmNode.decommissioningTimeout = timeout;
if (rmNode.originalTotalCapability == null){
rmNode.originalTotalCapability =
Resources.clone(rmNode.totalCapability);
@@ -1156,24 +1183,6 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
return NodeState.UNHEALTHY;
}
}
- if (isNodeDecommissioning) {
- List<ApplicationId> runningApps = rmNode.getRunningApps();
-
- List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
-
- // no running (and keeping alive) app on this node, get it
- // decommissioned.
- // TODO may need to check no container is being scheduled on this node
- // as well.
- if ((runningApps == null || runningApps.size() == 0)
- && (keepAliveApps == null || keepAliveApps.size() == 0)) {
- RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
- return NodeState.DECOMMISSIONED;
- }
-
- // TODO (in YARN-3223) if node in decommissioning, get node resource
- // updated if container get finished (keep available resource to be 0)
- }
rmNode.handleContainerStatus(statusEvent.getContainers());
rmNode.handleReportedIncreasedContainers(
@@ -1472,4 +1481,9 @@ public class RMNodeImpl implements RMNode,
EventHandler<RMNodeEvent> {
public void setUntrackedTimeStamp(long ts) {
this.timeStamp = ts;
}
+
+ @Override
+ public Integer getDecommissioningTimeout() {
+ return decommissioningTimeout;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
index 3012d0d..1789e09 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
@@ -97,7 +97,7 @@ public class ClusterMetricsInfo {
this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
this.totalNodes = activeNodes + lostNodes + decommissionedNodes
- + rebootedNodes + unhealthyNodes + shutdownNodes;
+ + rebootedNodes + unhealthyNodes + decommissioningNodes +
shutdownNodes;
}
public int getAppsSubmitted() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]