Fixed issues with not all maps allowing sideeffects.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3fa24d1d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3fa24d1d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3fa24d1d

Branch: refs/heads/master
Commit: 3fa24d1dad5b5b4504c217f511c98d43aebf6a14
Parents: 337aef8
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 9 16:52:25 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Apr 9 16:52:25 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/localizer/AsyncLocalizer.java   | 15 +++++++++------
 .../apache/storm/localizer/AsyncLocalizerTest.java   |  5 +++--
 2 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 1852d94..1cdcaad 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -75,10 +75,13 @@ public class AsyncLocalizer implements AutoCloseable {
 
     private final boolean isLocalMode;
     // track resources - user to resourceSet
-    protected final ConcurrentMap<String, ConcurrentMap<String, 
LocalizedResource>> userFiles = new ConcurrentHashMap<>();
-    protected final ConcurrentMap<String, ConcurrentMap<String, 
LocalizedResource>> userArchives = new ConcurrentHashMap<>();
+    //ConcurrentHashMap is explicitly used everywhere in this class because it 
uses locks to guarantee atomicity for compute and
+    // computeIfAbsent where as ConcurrentMap allows for a retry of the 
function passed in, and would require the function to have
+    // no side effects.
+    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> userFiles = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> userArchives = new ConcurrentHashMap<>();
     // topology to tracking of topology dir and resources
-    private final Map<String, CompletableFuture<Void>> blobPending;
+    private final ConcurrentHashMap<String, CompletableFuture<Void>> 
blobPending;
     private final Map<String, Object> conf;
     private final AdvancedFSOps fsOps;
     private final boolean symlinksDisabled;
@@ -118,7 +121,7 @@ public class AsyncLocalizer implements AutoCloseable {
         reconstructLocalizedResources();
 
         symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, 
false);
-        blobPending = new HashMap<>();
+        blobPending = new ConcurrentHashMap<>();
     }
 
     public AsyncLocalizer(Map<String, Object> conf) throws IOException {
@@ -629,12 +632,12 @@ public class AsyncLocalizer implements AutoCloseable {
     void cleanup() {
         LocalizedResourceRetentionSet toClean = new 
LocalizedResourceRetentionSet(cacheTargetSize);
         // need one large set of all and then clean via LRU
-        for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t : 
userArchives.entrySet()) {
+        for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t 
: userArchives.entrySet()) {
             toClean.addResources(t.getValue());
             LOG.debug("Resources to be cleaned after adding {} archives : {}", 
t.getKey(), toClean);
         }
 
-        for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t : 
userFiles.entrySet()) {
+        for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t 
: userFiles.entrySet()) {
             toClean.addResources(t.getValue());
             LOG.debug("Resources to be cleaned after adding {} files : {}", 
t.getKey(), toClean);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 63f4e9c..00a3f98 100644
--- 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -298,11 +299,11 @@ public class AsyncLocalizerTest {
         }
 
         // For testing, be careful as it doesn't clone
-        ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> 
getUserFiles() {
+        ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> getUserFiles() {
             return userFiles;
         }
 
-        ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> 
getUserArchives() {
+        ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> getUserArchives() {
             return userArchives;
         }
 

Reply via email to