This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2764236  YARN-9011. Race condition during decommissioning. Contributed 
by Peter Bacsko
2764236 is described below

commit 27642367ef3409a9ca93747c6c2cc279c087a4c0
Author: Szilard Nemeth <snem...@apache.org>
AuthorDate: Mon Nov 18 16:29:39 2019 +0100

    YARN-9011. Race condition during decommissioning. Contributed by Peter 
Bacsko
---
 .../org/apache/hadoop/util/HostsFileReader.java    | 35 ++++++++++++-
 .../apache/hadoop/util/TestHostsFileReader.java    | 61 +++++++++++++++++++++-
 .../server/resourcemanager/NodesListManager.java   | 43 ++++++++++++---
 .../resourcemanager/ResourceTrackerService.java    | 13 +++--
 4 files changed, 140 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
index 6f41b8c..5141740 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
@@ -52,6 +52,8 @@ public class HostsFileReader {
       .class);
 
   private final AtomicReference<HostDetails> current;
+  private final AtomicReference<HostDetails> lazyLoaded =
+      new AtomicReference<>();
 
   public HostsFileReader(String inFile,
                          String exFile) throws IOException {
@@ -187,7 +189,18 @@ public class HostsFileReader {
 
   public void refresh(String includesFile, String excludesFile)
       throws IOException {
-    LOG.info("Refreshing hosts (include/exclude) list");
+    refreshInternal(includesFile, excludesFile, false);
+  }
+
+  public void lazyRefresh(String includesFile, String excludesFile)
+      throws IOException {
+    refreshInternal(includesFile, excludesFile, true);
+  }
+
+  private void refreshInternal(String includesFile, String excludesFile,
+      boolean lazy) throws IOException {
+    LOG.info("Refreshing hosts (include/exclude) list (lazy refresh = {})",
+        lazy);
     HostDetails oldDetails = current.get();
     Set<String> newIncludes = oldDetails.includes;
     Map<String, Integer> newExcludes = oldDetails.excludes;
@@ -203,7 +216,21 @@ public class HostsFileReader {
     }
     HostDetails newDetails = new HostDetails(includesFile, newIncludes,
         excludesFile, newExcludes);
-    current.set(newDetails);
+
+    if (lazy) {
+      lazyLoaded.set(newDetails);
+    } else {
+      current.set(newDetails);
+    }
+  }
+
+  public void finishRefresh() {
+    if (lazyLoaded.get() == null) {
+      throw new IllegalStateException(
+          "Cannot finish refresh - call lazyRefresh() first");
+    }
+    current.set(lazyLoaded.get());
+    lazyLoaded.set(null);
   }
 
   @Private
@@ -279,6 +306,10 @@ public class HostsFileReader {
     return current.get();
   }
 
+  public HostDetails getLazyLoadedHostDetails() {
+    return lazyLoaded.get();
+  }
+
   public void setIncludesFile(String includesFile) {
     LOG.info("Setting the includes file to " + includesFile);
     HostDetails oldDetails = current.get();
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
index fd9966f..60dda98 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.util;
 
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.nio.file.NoSuchFileException;
 import java.util.Map;
 
@@ -347,4 +348,62 @@ public class TestHostsFileReader {
     assertTrue(excludes.get("host5") == 1800);
     assertTrue(excludes.get("host6") == 1800);
   }
-}
+
+  @Test
+  public void testLazyRefresh() throws IOException {
+    FileWriter efw = new FileWriter(excludesFile);
+    FileWriter ifw = new FileWriter(includesFile);
+
+    efw.write("host1\n");
+    efw.write("host2\n");
+    efw.close();
+    ifw.write("host3\n");
+    ifw.write("host4\n");
+    ifw.close();
+
+    HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile);
+
+    ifw = new FileWriter(includesFile);
+    ifw.close();
+
+    efw = new FileWriter(excludesFile, true);
+    efw.write("host3\n");
+    efw.write("host4\n");
+    efw.close();
+
+    hfp.lazyRefresh(includesFile, excludesFile);
+
+    HostDetails details = hfp.getHostDetails();
+    HostDetails lazyDetails = hfp.getLazyLoadedHostDetails();
+
+    assertEquals("Details: no. of excluded hosts", 2,
+        details.getExcludedHosts().size());
+    assertEquals("Details: no. of included hosts", 2,
+        details.getIncludedHosts().size());
+    assertEquals("LazyDetails: no. of excluded hosts", 4,
+        lazyDetails.getExcludedHosts().size());
+    assertEquals("LayDetails: no. of included hosts", 0,
+        lazyDetails.getIncludedHosts().size());
+
+    hfp.finishRefresh();
+
+    details = hfp.getHostDetails();
+    assertEquals("Details: no. of excluded hosts", 4,
+        details.getExcludedHosts().size());
+    assertEquals("Details: no. of included hosts", 0,
+        details.getIncludedHosts().size());
+    assertNull("Lazy host details should be null",
+        hfp.getLazyLoadedHostDetails());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testFinishRefreshWithoutLazyRefresh() throws IOException {
+    FileWriter efw = new FileWriter(excludesFile);
+    FileWriter ifw = new FileWriter(includesFile);
+    efw.close();
+    ifw.close();
+
+    HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile);
+    hfp.finishRefresh();
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index b87260e..1fab075 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -84,10 +84,12 @@ public class NodesListManager extends CompositeService 
implements
   private Resolver resolver;
   private Timer removalTimer;
   private int nodeRemovalCheckInterval;
+  private Set<RMNode> gracefulDecommissionableNodes;
 
   public NodesListManager(RMContext rmContext) {
     super(NodesListManager.class.getName());
     this.rmContext = rmContext;
+    this.gracefulDecommissionableNodes = ConcurrentHashMap.newKeySet();
   }
 
   @Override
@@ -115,7 +117,7 @@ public class NodesListManager extends CompositeService 
implements
       this.hostsReader =
           createHostsFileReader(this.includesFile, this.excludesFile);
       setDecommissionedNMs();
-      printConfiguredHosts();
+      printConfiguredHosts(false);
     } catch (YarnException ex) {
       disableHostsFileReader(ex);
     } catch (IOException ioe) {
@@ -187,7 +189,7 @@ public class NodesListManager extends CompositeService 
implements
     removalTimer.cancel();
   }
 
-  private void printConfiguredHosts() {
+  private void printConfiguredHosts(boolean graceful) {
     if (!LOG.isDebugEnabled()) {
       return;
     }
@@ -198,7 +200,12 @@ public class NodesListManager extends CompositeService 
implements
         conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
             YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
 
-    HostDetails hostDetails = hostsReader.getHostDetails();
+    HostDetails hostDetails;
+    if (graceful) {
+      hostDetails = hostsReader.getLazyLoadedHostDetails();
+    } else {
+      hostDetails = hostsReader.getHostDetails();
+    }
     for (String include : hostDetails.getIncludedHosts()) {
       LOG.debug("include: " + include);
     }
@@ -235,8 +242,15 @@ public class NodesListManager extends CompositeService 
implements
         yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
             YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
     LOG.info("refreshNodes excludesFile " + excludesFile);
-    hostsReader.refresh(includesFile, excludesFile);
-    printConfiguredHosts();
+
+    if (graceful) {
+      // update hosts, but don't make it visible just yet
+      hostsReader.lazyRefresh(includesFile, excludesFile);
+    } else {
+      hostsReader.refresh(includesFile, excludesFile);
+    }
+
+    printConfiguredHosts(graceful);
 
     LOG.info("hostsReader include:{" +
         StringUtils.join(",", hostsReader.getHosts()) +
@@ -270,7 +284,14 @@ public class NodesListManager extends CompositeService 
implements
     // Nodes need to be decommissioned (graceful or forceful);
     List<RMNode> nodesToDecom = new ArrayList<RMNode>();
 
-    HostDetails hostDetails = hostsReader.getHostDetails();
+    HostDetails hostDetails;
+    gracefulDecommissionableNodes.clear();
+    if (graceful) {
+      hostDetails = hostsReader.getLazyLoadedHostDetails();
+    } else {
+      hostDetails = hostsReader.getHostDetails();
+    }
+
     Set<String> includes = hostDetails.getIncludedHosts();
     Map<String, Integer> excludes = hostDetails.getExcludedMap();
 
@@ -298,11 +319,13 @@ public class NodesListManager extends CompositeService 
implements
               s != NodeState.DECOMMISSIONING) {
             LOG.info("Gracefully decommission " + nodeStr);
             nodesToDecom.add(n);
+            gracefulDecommissionableNodes.add(n);
           } else if (s == NodeState.DECOMMISSIONING &&
                      !Objects.equals(n.getDecommissioningTimeout(),
                          timeoutToUse)) {
             LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
             nodesToDecom.add(n);
+            gracefulDecommissionableNodes.add(n);
           } else {
             LOG.info("No action for " + nodeStr);
           }
@@ -315,6 +338,10 @@ public class NodesListManager extends CompositeService 
implements
       }
     }
 
+    if (graceful) {
+      hostsReader.finishRefresh();
+    }
+
     for (RMNode n : nodesToRecom) {
       RMNodeEvent e = new RMNodeEvent(
           n.getNodeID(), RMNodeEventType.RECOMMISSION);
@@ -466,6 +493,10 @@ public class NodesListManager extends CompositeService 
implements
         hostDetails.getExcludedHosts());
   }
 
+  boolean isGracefullyDecommissionableNode(RMNode node) {
+    return gracefulDecommissionableNodes.contains(node);
+  }
+
   private boolean isValidNode(
       String hostName, Set<String> hostsList, Set<String> excludeList) {
     String ip = resolver.resolve(hostName);
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 5761937..2c89ddd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -836,10 +836,17 @@ public class ResourceTrackerService extends 
AbstractService implements
    */
   private boolean isNodeInDecommissioning(NodeId nodeId) {
     RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
-    if (rmNode != null &&
-        rmNode.getState().equals(NodeState.DECOMMISSIONING)) {
-      return true;
+
+    if (rmNode != null) {
+      NodeState state = rmNode.getState();
+
+      if (state == NodeState.DECOMMISSIONING ||
+          (state == NodeState.RUNNING &&
+          this.nodesListManager.isGracefullyDecommissionableNode(rmNode))) {
+        return true;
+      }
     }
+
     return false;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to