This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e69a077af84 YARN-11497 : Support removal of only selective node states 
in untracked removal flow (#5681)
e69a077af84 is described below

commit e69a077af841ee7ae67b6aceff62065448a10ba5
Author: mudit-97 <32608527+mudit...@users.noreply.github.com>
AuthorDate: Mon Jun 5 15:06:10 2023 +0530

    YARN-11497 : Support removal of only selective node states in untracked 
removal flow (#5681)
    
    Co-authored-by: mudit.sharma <mudit.sha...@flipkart.com>
    Reviewed-by: Shilun Fan <slfan1...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java | 13 ++++
 .../src/main/resources/yarn-default.xml            | 12 ++++
 .../server/resourcemanager/NodesListManager.java   | 14 ++++
 .../TestResourceTrackerService.java                | 81 ++++++++++++++++++++++
 4 files changed, 120 insertions(+)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ff531cdc2a7..56bbe8843d4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1210,6 +1210,19 @@ public class YarnConfiguration extends Configuration {
   public static final boolean
       DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH = false;
 
+  /**
+   * When non empty, untracked nodes are deleted only if their state is one of
+   * the states defined by this config. When empty, all the states are eligible
+   * for removal
+   * Eligible states are defined by enum values here:
+   * @see org.apache.hadoop.yarn.api.records.NodeState
+   * Example: LOST,DECOMMISSIONED
+   */
+  public static final String 
RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE =
+      RM_PREFIX + "node-removal-untracked.node-selective-states-to-remove";
+  public static final String[]
+      DEFAULT_RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE = {};
+
   /**
    * RM proxy users' prefix
    */
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index aea92260013..0069e9ef360 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5316,4 +5316,16 @@
     </description>
   </property>
 
+  <property>
+    <description>
+      If Yarn untracked removal is enabled, then this config can control what 
all
+      node states can be removed. If the untracked node is not having one of 
these
+      states, then node will skipped for removal. If this config value is set 
to
+      empty, all node states, will be eligible for removal
+      NodeState is an ENUM: org.apache.hadoop.yarn.api.records.NodeState
+    </description>
+    
<name>yarn.resourcemanager.node-removal-untracked.node-selective-states-to-remove</name>
+    <value></value>
+  </property>
+
 </configuration>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 21be92169a3..6b5bf0e4f0b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -30,7 +31,9 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -128,6 +131,10 @@ public class NodesListManager extends CompositeService 
implements
     enableNodeUntrackedWithoutIncludePath = conf.getBoolean(
         YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH,
         
YarnConfiguration.DEFAULT_RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH);
+    final Set<String> untrackedSelectiveStatesToRemove = 
Arrays.stream(conf.getStrings(
+        
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE,
+        
YarnConfiguration.DEFAULT_RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE))
+            .collect(Collectors.toSet());
     final int nodeRemovalTimeout =
         conf.getInt(
             YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
@@ -146,6 +153,13 @@ public class NodesListManager extends CompositeService 
implements
           NodeId nodeId = entry.getKey();
           RMNode rmNode = entry.getValue();
           if (isUntrackedNode(rmNode.getHostName())) {
+            if(CollectionUtils.isNotEmpty(untrackedSelectiveStatesToRemove) &&
+                
!untrackedSelectiveStatesToRemove.contains(rmNode.getState().toString())) {
+              LOG.warn("Untracked node {}, with node state {} is not part of " 
+
+                  "node-removal-untracked.node-selective-states-to-remove 
config",
+                  rmNode.getHostName(), rmNode.getState().toString());
+              continue;
+            }
             if (rmNode.getUntrackedTimeStamp() == 0) {
               rmNode.setUntrackedTimeStamp(now);
             } else
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e4f0b79e373..358cf9e0f8f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -3222,4 +3222,85 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
 
     rm.close();
   }
+
+  /**
+   * Decommissioning with selective states for untracked nodes.
+   */
+  @Test
+  public void testDecommissionWithSelectiveStates() throws Exception {
+    // clear exclude hosts
+    writeToHostsFile(excludeHostFile, "");
+    // init conf:
+    // (1) set untracked removal timeout to 500ms
+    // (2) set exclude path (no include path)
+    // (3) enable node untracked without pre-configured include path
+    Configuration conf = new Configuration();
+    
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, 
500);
+    
conf.setBoolean(YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH,
 true);
+    
conf.setStrings(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE,
+         "DECOMMISSIONED", "SHUTDOWN");
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, 
excludeHostFile.getAbsolutePath());
+
+    rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = rm.registerNode("host1:1234", 10240);
+    MockNM nm2 = rm.registerNode("host2:1234", 10240);
+    MockNM nm3 = rm.registerNode("host3:1234", 10240);
+    MockNM nm4 = rm.registerNode("host4:1234", 10240);
+    assertEquals(4, rm.getRMContext().getRMNodes().size());
+    assertEquals(0, rm.getRMContext().getInactiveRMNodes().size());
+
+    // decommission nm1 via adding nm1 into exclude hosts
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    writeToHostsFile(excludeHostFile, "host1");
+    rm.getNodesListManager().refreshNodes(conf);
+    rm.drainEvents();
+    assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED);
+    assertEquals(3, rm.getRMContext().getRMNodes().size());
+    assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
+    assertEquals(new HashSet(Arrays.asList(nm1.getNodeId())),
+         rm.getRMContext().getInactiveRMNodes().keySet());
+
+    // remove nm1 from exclude hosts, so that it will be marked as untracked
+    // and removed from inactive nodes after the timeout
+    writeToHostsFile(excludeHostFile, "");
+    rm.getNodesListManager().refreshNodes(conf);
+    // confirmed that nm1 should be removed from inactive nodes in 1 second
+    GenericTestUtils.waitFor(() -> 
rm.getRMContext().getInactiveRMNodes().size() == 0,
+         100, 1000);
+
+    // lost nm2
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+    rm.getRMContext().getDispatcher().getEventHandler()
+         .handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE));
+    rm.drainEvents();
+    assertEquals(rmNode2.getState(), NodeState.LOST);
+    assertEquals(2, rm.getRMContext().getRMNodes().size());
+    assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
+    // confirmed that nm2 should not be removed from inactive nodes in 1 second
+    GenericTestUtils.waitFor(() -> 
rm.getRMContext().getInactiveRMNodes().size() == 1,
+         100, 1000);
+
+    // shutdown nm3
+    RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
+    rm.getRMContext().getDispatcher().getEventHandler()
+         .handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN));
+    rm.drainEvents();
+    assertEquals(rmNode3.getState(), NodeState.SHUTDOWN);
+    assertEquals(1, rm.getRMContext().getRMNodes().size());
+    assertEquals(2, rm.getRMContext().getInactiveRMNodes().size());
+    // confirmed that nm3 should be removed from inactive nodes in 1 second
+    GenericTestUtils.waitFor(() -> 
rm.getRMContext().getInactiveRMNodes().size() == 1,
+         100, 1000);
+
+    // nm4 is still active node at last
+    assertEquals(new HashSet(Arrays.asList(nm4.getNodeId())),
+         rm.getRMContext().getRMNodes().keySet());
+
+    // nm2 is still inactive node at last, not removed
+    assertEquals(new HashSet(Arrays.asList(nm2.getNodeId())),
+         rm.getRMContext().getInactiveRMNodes().keySet());
+
+    rm.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to