This is an automated email from the ASF dual-hosted git repository. snemeth pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 2764236 YARN-9011. Race condition during decommissioning. Contributed by Peter Bacsko 2764236 is described below commit 27642367ef3409a9ca93747c6c2cc279c087a4c0 Author: Szilard Nemeth <snem...@apache.org> AuthorDate: Mon Nov 18 16:29:39 2019 +0100 YARN-9011. Race condition during decommissioning. Contributed by Peter Bacsko --- .../org/apache/hadoop/util/HostsFileReader.java | 35 ++++++++++++- .../apache/hadoop/util/TestHostsFileReader.java | 61 +++++++++++++++++++++- .../server/resourcemanager/NodesListManager.java | 43 ++++++++++++--- .../resourcemanager/ResourceTrackerService.java | 13 +++-- 4 files changed, 140 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index 6f41b8c..5141740 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -52,6 +52,8 @@ public class HostsFileReader { .class); private final AtomicReference<HostDetails> current; + private final AtomicReference<HostDetails> lazyLoaded = + new AtomicReference<>(); public HostsFileReader(String inFile, String exFile) throws IOException { @@ -187,7 +189,18 @@ public class HostsFileReader { public void refresh(String includesFile, String excludesFile) throws IOException { - LOG.info("Refreshing hosts (include/exclude) list"); + refreshInternal(includesFile, excludesFile, false); + } + + public void lazyRefresh(String includesFile, String excludesFile) + throws IOException { + refreshInternal(includesFile, excludesFile, true); + } + + private void refreshInternal(String includesFile, String excludesFile, + boolean lazy) throws IOException { + LOG.info("Refreshing hosts (include/exclude) list (lazy refresh = {})", + lazy); HostDetails oldDetails = current.get(); Set<String> newIncludes = oldDetails.includes; Map<String, Integer> newExcludes = oldDetails.excludes; @@ -203,7 +216,21 @@ public class HostsFileReader { } HostDetails newDetails = new HostDetails(includesFile, newIncludes, excludesFile, newExcludes); - current.set(newDetails); + + if (lazy) { + lazyLoaded.set(newDetails); + } else { + current.set(newDetails); + } + } + + public void finishRefresh() { + if (lazyLoaded.get() == null) { + throw new IllegalStateException( + "Cannot finish refresh - call lazyRefresh() first"); + } + current.set(lazyLoaded.get()); + lazyLoaded.set(null); } @Private @@ -279,6 +306,10 @@ public class HostsFileReader { return current.get(); } + public HostDetails getLazyLoadedHostDetails() { + return lazyLoaded.get(); + } + public void setIncludesFile(String includesFile) { LOG.info("Setting the includes file to " + includesFile); HostDetails oldDetails = current.get(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index fd9966f..60dda98 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.util; import java.io.File; import java.io.FileWriter; +import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Map; @@ -347,4 +348,62 @@ public class TestHostsFileReader { assertTrue(excludes.get("host5") == 1800); assertTrue(excludes.get("host6") == 1800); } -} + + @Test + public void testLazyRefresh() throws IOException { + FileWriter efw = new FileWriter(excludesFile); + FileWriter ifw = new FileWriter(includesFile); + + efw.write("host1\n"); + efw.write("host2\n"); + efw.close(); + ifw.write("host3\n"); + ifw.write("host4\n"); + ifw.close(); + + HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile); + + ifw = new FileWriter(includesFile); + ifw.close(); + + efw = new FileWriter(excludesFile, true); + efw.write("host3\n"); + efw.write("host4\n"); + efw.close(); + + hfp.lazyRefresh(includesFile, excludesFile); + + HostDetails details = hfp.getHostDetails(); + HostDetails lazyDetails = hfp.getLazyLoadedHostDetails(); + + assertEquals("Details: no. of excluded hosts", 2, + details.getExcludedHosts().size()); + assertEquals("Details: no. of included hosts", 2, + details.getIncludedHosts().size()); + assertEquals("LazyDetails: no. of excluded hosts", 4, + lazyDetails.getExcludedHosts().size()); + assertEquals("LayDetails: no. of included hosts", 0, + lazyDetails.getIncludedHosts().size()); + + hfp.finishRefresh(); + + details = hfp.getHostDetails(); + assertEquals("Details: no. of excluded hosts", 4, + details.getExcludedHosts().size()); + assertEquals("Details: no. of included hosts", 0, + details.getIncludedHosts().size()); + assertNull("Lazy host details should be null", + hfp.getLazyLoadedHostDetails()); + } + + @Test(expected = IllegalStateException.class) + public void testFinishRefreshWithoutLazyRefresh() throws IOException { + FileWriter efw = new FileWriter(excludesFile); + FileWriter ifw = new FileWriter(includesFile); + efw.close(); + ifw.close(); + + HostsFileReader hfp = new HostsFileReader(includesFile, excludesFile); + hfp.finishRefresh(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index b87260e..1fab075 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -84,10 +84,12 @@ public class NodesListManager extends CompositeService implements private Resolver resolver; private Timer removalTimer; private int nodeRemovalCheckInterval; + private Set<RMNode> gracefulDecommissionableNodes; public NodesListManager(RMContext rmContext) { super(NodesListManager.class.getName()); this.rmContext = rmContext; + this.gracefulDecommissionableNodes = ConcurrentHashMap.newKeySet(); } @Override @@ -115,7 +117,7 @@ public class NodesListManager extends CompositeService implements this.hostsReader = createHostsFileReader(this.includesFile, this.excludesFile); setDecommissionedNMs(); - printConfiguredHosts(); + printConfiguredHosts(false); } catch (YarnException ex) { disableHostsFileReader(ex); } catch (IOException ioe) { @@ -187,7 +189,7 @@ public class NodesListManager extends CompositeService implements removalTimer.cancel(); } - private void printConfiguredHosts() { + private void printConfiguredHosts(boolean graceful) { if (!LOG.isDebugEnabled()) { return; } @@ -198,7 +200,12 @@ public class NodesListManager extends CompositeService implements conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); - HostDetails hostDetails = hostsReader.getHostDetails(); + HostDetails hostDetails; + if (graceful) { + hostDetails = hostsReader.getLazyLoadedHostDetails(); + } else { + hostDetails = hostsReader.getHostDetails(); + } for (String include : hostDetails.getIncludedHosts()) { LOG.debug("include: " + include); } @@ -235,8 +242,15 @@ public class NodesListManager extends CompositeService implements yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); LOG.info("refreshNodes excludesFile " + excludesFile); - hostsReader.refresh(includesFile, excludesFile); - printConfiguredHosts(); + + if (graceful) { + // update hosts, but don't make it visible just yet + hostsReader.lazyRefresh(includesFile, excludesFile); + } else { + hostsReader.refresh(includesFile, excludesFile); + } + + printConfiguredHosts(graceful); LOG.info("hostsReader include:{" + StringUtils.join(",", hostsReader.getHosts()) + @@ -270,7 +284,14 @@ public class NodesListManager extends CompositeService implements // Nodes need to be decommissioned (graceful or forceful); List<RMNode> nodesToDecom = new ArrayList<RMNode>(); - HostDetails hostDetails = hostsReader.getHostDetails(); + HostDetails hostDetails; + gracefulDecommissionableNodes.clear(); + if (graceful) { + hostDetails = hostsReader.getLazyLoadedHostDetails(); + } else { + hostDetails = hostsReader.getHostDetails(); + } + Set<String> includes = hostDetails.getIncludedHosts(); Map<String, Integer> excludes = hostDetails.getExcludedMap(); @@ -298,11 +319,13 @@ public class NodesListManager extends CompositeService implements s != NodeState.DECOMMISSIONING) { LOG.info("Gracefully decommission " + nodeStr); nodesToDecom.add(n); + gracefulDecommissionableNodes.add(n); } else if (s == NodeState.DECOMMISSIONING && !Objects.equals(n.getDecommissioningTimeout(), timeoutToUse)) { LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse); nodesToDecom.add(n); + gracefulDecommissionableNodes.add(n); } else { LOG.info("No action for " + nodeStr); } @@ -315,6 +338,10 @@ public class NodesListManager extends CompositeService implements } } + if (graceful) { + hostsReader.finishRefresh(); + } + for (RMNode n : nodesToRecom) { RMNodeEvent e = new RMNodeEvent( n.getNodeID(), RMNodeEventType.RECOMMISSION); @@ -466,6 +493,10 @@ public class NodesListManager extends CompositeService implements hostDetails.getExcludedHosts()); } + boolean isGracefullyDecommissionableNode(RMNode node) { + return gracefulDecommissionableNodes.contains(node); + } + private boolean isValidNode( String hostName, Set<String> hostsList, Set<String> excludeList) { String ip = resolver.resolve(hostName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 5761937..2c89ddd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -836,10 +836,17 @@ public class ResourceTrackerService extends AbstractService implements */ private boolean isNodeInDecommissioning(NodeId nodeId) { RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); - if (rmNode != null && - rmNode.getState().equals(NodeState.DECOMMISSIONING)) { - return true; + + if (rmNode != null) { + NodeState state = rmNode.getState(); + + if (state == NodeState.DECOMMISSIONING || + (state == NodeState.RUNNING && + this.nodesListManager.isGracefullyDecommissionableNode(rmNode))) { + return true; + } } + return false; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org