goiri commented on code in PR #5905:
URL: https://github.com/apache/hadoop/pull/5905#discussion_r1312369607
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java:
##########
@@ -162,6 +162,80 @@ public void
testDecommissioningNodesWatcherWithPreviousRunningApps()
watcher.checkDecommissioningStatus(id1));
}
+ @Test
+ public void testDecommissioningNodesWatcherWithScheduledAMContainers()
+ throws Exception {
+ Configuration conf = new Configuration();
+ // decommission timeout is 10 min
+ conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "600");
Review Comment:
setInt or ideally setTimeDuration?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java:
##########
@@ -1439,10 +1439,15 @@ public NodeState transition(RMNodeImpl rmNode,
RMNodeEvent event) {
* @return true if node has any AM scheduled on it.
*/
private boolean hasScheduledAMContainers(RMNodeImpl rmNode) {
- return rmNode.context.getScheduler()
+ boolean hasScheduledAMContainers = rmNode.context.getScheduler()
.getSchedulerNode(rmNode.getNodeID())
.getCopiedListOfRunningContainers()
.stream().anyMatch(RMContainer::isAMContainer);
+ if (hasScheduledAMContainers) {
+ LOG.info("Node " + rmNode.nodeId + " has AM containers scheduled on
it."
Review Comment:
Use {} logger format.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java:
##########
@@ -162,6 +162,80 @@ public void
testDecommissioningNodesWatcherWithPreviousRunningApps()
watcher.checkDecommissioningStatus(id1));
}
+ @Test
+ public void testDecommissioningNodesWatcherWithScheduledAMContainers()
+ throws Exception {
+ Configuration conf = new Configuration();
+ // decommission timeout is 10 min
+ conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "600");
+ // watcher delay is 5min
+ conf.set(YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS,
"300000");
+
+ rm = new MockRM(conf);
+ rm.start();
+
+ DecommissioningNodesWatcher watcher =
+ new DecommissioningNodesWatcher(rm.getRMContext());
+ watcher.init(conf);
+
+ MockNM nm1 = rm.registerNode("host1:1234", 10240);
+ RMNodeImpl node1 =
+ (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ NodeId id1 = nm1.getNodeId();
+
+ rm.waitForState(id1, NodeState.RUNNING);
+
+ // just submit the app
+ RMApp app = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(2000,
rm).build());
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+ // rmnode still thinks there are 0 apps because it hasn't received updated
node status
+ Assert.assertEquals(0, node1.getRunningApps().size());
+
+ // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. Right now
+ // there is no container running on the node.
+ rm.sendNodeGracefulDecommission(nm1,
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+ rm.waitForState(id1, NodeState.DECOMMISSIONING);
+
+ // we should still get WAIT_SCHEDULED_APPS as expiry time is not over
+ NodeHealthStatus status = NodeHealthStatus.newInstance(true, "",
+ System.currentTimeMillis() - 1000);
+ NodeStatus nodeStatus = NodeStatus.newInstance(id1, 0,
+ new ArrayList<>(), Collections.emptyList(), status, null, null,
null);
+ watcher.update(node1, nodeStatus);
+
+ Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
+ Assert.assertEquals(DecommissioningNodeStatus.WAIT_SCHEDULED_APPS,
+ watcher.checkDecommissioningStatus(id1));
+
+ // update node with 3 running containers
+ nodeStatus = createNodeStatus(id1, app, 3);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+ watcher.update(node1, nodeStatus);
+ Assert.assertEquals(1, node1.getRunningApps().size());
+ Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
+ Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER,
+ watcher.checkDecommissioningStatus(id1));
+
+ // update node with 0 running containers
+ nodeStatus = createNodeStatus(id1, app, 0);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+ watcher.update(node1, nodeStatus);
+ Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
+ // we still get status as WAIT_APP since app is still running even if
+ // containers are 0
+ Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
+ watcher.checkDecommissioningStatus(id1));
+
+ // Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
+ MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
+ rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+ Assert.assertEquals(0, node1.getRunningApps().size());
+ watcher.update(node1, nodeStatus);
Review Comment:
What do we update this for if we don't assert later?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java:
##########
@@ -351,6 +351,65 @@ public void testGracefulDecommissionNoApp() throws
Exception {
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
}
+ @Test (timeout = 60000)
+ public void testGracefulDecommissionRaceCondition() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
+ .getAbsolutePath());
+ conf.set(YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS,
"120000");
+
+ writeToHostsFile("");
+ rm = new MockRM(conf);
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("host1:1234", 10240);
Review Comment:
Make it 10*1024.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java:
##########
@@ -162,6 +162,80 @@ public void
testDecommissioningNodesWatcherWithPreviousRunningApps()
watcher.checkDecommissioningStatus(id1));
}
+ @Test
+ public void testDecommissioningNodesWatcherWithScheduledAMContainers()
+ throws Exception {
+ Configuration conf = new Configuration();
+ // decommission timeout is 10 min
+ conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "600");
+ // watcher delay is 5min
+ conf.set(YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS,
"300000");
+
+ rm = new MockRM(conf);
+ rm.start();
+
+ DecommissioningNodesWatcher watcher =
+ new DecommissioningNodesWatcher(rm.getRMContext());
+ watcher.init(conf);
+
+ MockNM nm1 = rm.registerNode("host1:1234", 10240);
+ RMNodeImpl node1 =
+ (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ NodeId id1 = nm1.getNodeId();
+
+ rm.waitForState(id1, NodeState.RUNNING);
+
+ // just submit the app
+ RMApp app = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(2000,
rm).build());
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+ // rmnode still thinks there are 0 apps because it hasn't received updated
node status
+ Assert.assertEquals(0, node1.getRunningApps().size());
+
+ // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. Right now
+ // there is no container running on the node.
+ rm.sendNodeGracefulDecommission(nm1,
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+ rm.waitForState(id1, NodeState.DECOMMISSIONING);
+
+ // we should still get WAIT_SCHEDULED_APPS as expiry time is not over
+ NodeHealthStatus status = NodeHealthStatus.newInstance(true, "",
+ System.currentTimeMillis() - 1000);
Review Comment:
Indentation looks funny.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java:
##########
@@ -126,9 +127,21 @@ public void init(Configuration conf) {
YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
YarnConfiguration
.DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL);
+ this.expireIntvl = setExpireInterval(conf);
pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v));
}
+ // set value of decommissioned nodes watcher delay to allow delayed removal
of
+ // decommissioning nodes, but delay should not be more than
RM_AM_EXPIRY_INTERVAL_MS
+ private long setExpireInterval(Configuration conf) {
+ return Math.min(
+ conf.getInt(YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS,
Review Comment:
Could we use getTimeDuration()?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]