This is an automated email from the ASF dual-hosted git repository.
prabhujoseph pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 24b86cc84e5 YARN-11197.Backport YARN-9608 -
DecommissioningNodesWatcher should get lists of running applications on node
from RMNode. (#4500)
24b86cc84e5 is described below
commit 24b86cc84e552a316ef3efda1f88ee0b7f768b77
Author: Ashutosh Gupta <[email protected]>
AuthorDate: Tue Jun 28 05:03:55 2022 +0100
YARN-11197.Backport YARN-9608 - DecommissioningNodesWatcher should get
lists of running applications on node from RMNode. (#4500)
Co-authored-by: Ashutosh Gupta <[email protected]>
---
.../DecommissioningNodesWatcher.java | 47 ++-------
.../TestDecommissioningNodesWatcher.java | 106 +++++++++++++++++----
2 files changed, 95 insertions(+), 58 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
index ca3eb798414..707a8fb1ec2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
@@ -17,9 +17,11 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
@@ -36,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -58,13 +59,8 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
* a DECOMMISSIONING node will be DECOMMISSIONED no later than
* DECOMMISSIONING_TIMEOUT regardless of running containers or applications.
*
- * To be efficient, DecommissioningNodesWatcher skip tracking application
- * containers on a particular node before the node is in DECOMMISSIONING state.
- * It only tracks containers once the node is in DECOMMISSIONING state.
* DecommissioningNodesWatcher basically is no cost when no node is
- * DECOMMISSIONING. This sacrifices the possibility that the node once
- * host containers of an application that is still running
- * (the affected map tasks will be rescheduled).
+ * DECOMMISSIONING.
*/
public class DecommissioningNodesWatcher {
private static final Log LOG =
@@ -88,8 +84,8 @@ public class DecommissioningNodesWatcher {
// number of running containers at the moment.
private int numActiveContainers;
- // All applications run on the node at or after decommissioningStartTime.
- private Set<ApplicationId> appIds;
+ // All applications run on the node.
+ private List<ApplicationId> appIds;
// First moment the node is observed in DECOMMISSIONED state.
private long decommissionedTime;
@@ -102,7 +98,7 @@ public class DecommissioningNodesWatcher {
public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
this.nodeId = nodeId;
- this.appIds = new HashSet<ApplicationId>();
+ this.appIds = new ArrayList<>();
this.decommissioningStartTime = mclock.getTime();
this.timeoutMs = 1000L * timeoutSec;
}
@@ -164,9 +160,7 @@ public class DecommissioningNodesWatcher {
context.updateTimeout(rmNode.getDecommissioningTimeout());
context.lastUpdateTime = now;
- if (remoteNodeStatus.getKeepAliveApplications() != null) {
- context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications());
- }
+ context.appIds = rmNode.getRunningApps();
// Count number of active containers.
int numActiveContainers = 0;
@@ -176,14 +170,7 @@ public class DecommissioningNodesWatcher {
newState == ContainerState.NEW) {
numActiveContainers++;
}
- context.numActiveContainers = numActiveContainers;
- ApplicationId aid = cs.getContainerId()
- .getApplicationAttemptId().getApplicationId();
- if (!context.appIds.contains(aid)) {
- context.appIds.add(aid);
- }
}
-
context.numActiveContainers = numActiveContainers;
// maintain lastContainerFinishTime.
@@ -254,7 +241,6 @@ public class DecommissioningNodesWatcher {
DecommissioningNodeStatus.TIMEOUT;
}
- removeCompletedApps(context);
if (context.appIds.size() == 0) {
return DecommissioningNodeStatus.READY;
} else {
@@ -336,25 +322,6 @@ public class DecommissioningNodesWatcher {
return rmNode;
}
- private void removeCompletedApps(DecommissioningNodeContext context) {
- Iterator<ApplicationId> it = context.appIds.iterator();
- while (it.hasNext()) {
- ApplicationId appId = it.next();
- RMApp rmApp = rmContext.getRMApps().get(appId);
- if (rmApp == null) {
- LOG.debug("Consider non-existing app " + appId + " as completed");
- it.remove();
- continue;
- }
- if (rmApp.getState() == RMAppState.FINISHED ||
- rmApp.getState() == RMAppState.FAILED ||
- rmApp.getState() == RMAppState.KILLED) {
- LOG.debug("Remove " + rmApp.getState() + " app " + appId);
- it.remove();
- }
- }
- }
-
// Time in second to be decommissioned.
private int getTimeoutInSec(DecommissioningNodeContext context) {
if (context.nodeState == NodeState.DECOMMISSIONED) {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
index 09f1482eb5a..40c100aec91 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
@@ -21,10 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,7 +35,12 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import
org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
/**
* This class tests DecommissioningNodesWatcher.
@@ -57,42 +58,111 @@ public class TestDecommissioningNodesWatcher {
DecommissioningNodesWatcher watcher =
new DecommissioningNodesWatcher(rm.getRMContext());
-
MockNM nm1 = rm.registerNode("host1:1234", 10240);
- RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNodeImpl node1 =
+ (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
NodeId id1 = nm1.getNodeId();
rm.waitForState(id1, NodeState.RUNNING);
- Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
+
RMApp app = rm.submitApp(2000);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+ NodeStatus nodeStatus = createNodeStatus(id1, app, 3);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+
// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
rm.sendNodeGracefulDecommission(nm1,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
// Update status with decreasing number of running containers until 0.
- watcher.update(node1, createNodeStatus(id1, app, 12));
- watcher.update(node1, createNodeStatus(id1, app, 11));
+ nodeStatus = createNodeStatus(id1, app, 3);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+ watcher.update(node1, nodeStatus);
+
+ nodeStatus = createNodeStatus(id1, app, 2);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+ watcher.update(node1, nodeStatus);
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
- watcher.update(node1, createNodeStatus(id1, app, 1));
+ nodeStatus = createNodeStatus(id1, app, 1);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+ watcher.update(node1, nodeStatus);
Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER,
- watcher.checkDecommissioningStatus(id1));
+ watcher.checkDecommissioningStatus(id1));
- watcher.update(node1, createNodeStatus(id1, app, 0));
+ nodeStatus = createNodeStatus(id1, app, 0);
+ watcher.update(node1, nodeStatus);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
- watcher.checkDecommissioningStatus(id1));
+ watcher.checkDecommissioningStatus(id1));
// Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+ watcher.update(node1, nodeStatus);
Assert.assertEquals(DecommissioningNodeStatus.READY,
- watcher.checkDecommissioningStatus(id1));
+ watcher.checkDecommissioningStatus(id1));
}
+ @Test
+ public void testDecommissioningNodesWatcherWithPreviousRunningApps()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40");
+
+ rm = new MockRM(conf);
+ rm.start();
+
+ DecommissioningNodesWatcher watcher =
+ new DecommissioningNodesWatcher(rm.getRMContext());
+
+ MockNM nm1 = rm.registerNode("host1:1234", 10240);
+ RMNodeImpl node1 =
+ (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ NodeId id1 = nm1.getNodeId();
+
+ rm.waitForState(id1, NodeState.RUNNING);
+
+ RMApp app = rm.submitApp(2000);
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+ NodeStatus nodeStatus = createNodeStatus(id1, app, 3);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+
+ Assert.assertEquals(1, node1.getRunningApps().size());
+
+ // update node with 0 running containers
+ nodeStatus = createNodeStatus(id1, app, 0);
+ node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+
+ Assert.assertEquals(1, node1.getRunningApps().size());
+
+ // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. Right now
+ // there is no container running on the node.
+ rm.sendNodeGracefulDecommission(nm1,
+ YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+ rm.waitForState(id1, NodeState.DECOMMISSIONING);
+
+ // we should still get WAIT_APP as container for a running app previously
+ // ran on this node.
+ watcher.update(node1, nodeStatus);
+ Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
+ Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
+ watcher.checkDecommissioningStatus(id1));
+
+ // Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
+ MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
+ rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+ Assert.assertEquals(0, node1.getRunningApps().size());
+ watcher.update(node1, nodeStatus);
+ Assert.assertEquals(DecommissioningNodeStatus.READY,
+ watcher.checkDecommissioningStatus(id1));
+ }
+
+
@After
public void tearDown() {
if (rm != null) {
@@ -115,7 +185,7 @@ public class TestDecommissioningNodesWatcher {
private List<ContainerStatus> getContainerStatuses(
RMApp app, int numRunningContainers) {
// Total 12 containers
- final int total = 12;
+ final int total = 3;
numRunningContainers = Math.min(total, numRunningContainers);
List<ContainerStatus> output = new ArrayList<ContainerStatus>();
for (int i = 0; i < total; i++) {
@@ -123,8 +193,8 @@ public class TestDecommissioningNodesWatcher {
ContainerState.COMPLETE : ContainerState.RUNNING;
output.add(ContainerStatus.newInstance(
ContainerId.newContainerId(
- ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1),
- cstate, "Dummy", 0));
+ ApplicationAttemptId.newInstance(app.getApplicationId(), 0), i),
+ cstate, "", 0));
}
return output;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]