Fixed issues with not all maps allowing sideeffects.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3fa24d1d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3fa24d1d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3fa24d1d Branch: refs/heads/master Commit: 3fa24d1dad5b5b4504c217f511c98d43aebf6a14 Parents: 337aef8 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Mon Apr 9 16:52:25 2018 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Mon Apr 9 16:52:25 2018 -0500 ---------------------------------------------------------------------- .../org/apache/storm/localizer/AsyncLocalizer.java | 15 +++++++++------ .../apache/storm/localizer/AsyncLocalizerTest.java | 5 +++-- 2 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java index 1852d94..1cdcaad 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java @@ -75,10 +75,13 @@ public class AsyncLocalizer implements AutoCloseable { private final boolean isLocalMode; // track resources - user to resourceSet - protected final ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> userFiles = new ConcurrentHashMap<>(); - protected final ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> userArchives = new ConcurrentHashMap<>(); + //ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and + // computeIfAbsent where as ConcurrentMap allows for a retry of the function passed in, and would require the function to have + // no side effects. + protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userFiles = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userArchives = new ConcurrentHashMap<>(); // topology to tracking of topology dir and resources - private final Map<String, CompletableFuture<Void>> blobPending; + private final ConcurrentHashMap<String, CompletableFuture<Void>> blobPending; private final Map<String, Object> conf; private final AdvancedFSOps fsOps; private final boolean symlinksDisabled; @@ -118,7 +121,7 @@ public class AsyncLocalizer implements AutoCloseable { reconstructLocalizedResources(); symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); - blobPending = new HashMap<>(); + blobPending = new ConcurrentHashMap<>(); } public AsyncLocalizer(Map<String, Object> conf) throws IOException { @@ -629,12 +632,12 @@ public class AsyncLocalizer implements AutoCloseable { void cleanup() { LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize); // need one large set of all and then clean via LRU - for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t : userArchives.entrySet()) { + for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) { toClean.addResources(t.getValue()); LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean); } - for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t : userFiles.entrySet()) { + for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) { toClean.addResources(t.getValue()); LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean); } http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index 63f4e9c..00a3f98 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -298,11 +299,11 @@ public class AsyncLocalizerTest { } // For testing, be careful as it doesn't clone - ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> getUserFiles() { + ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> getUserFiles() { return userFiles; } - ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> getUserArchives() { + ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> getUserArchives() { return userArchives; }