Repository: storm
Updated Branches:
  refs/heads/master a8d84d671 -> b4e610c5f


STORM-3020: fix possible race condition in AsyncLocalizer


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/337aef8f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/337aef8f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/337aef8f

Branch: refs/heads/master
Commit: 337aef8f1291aba0ab228f6e9e0800c19b8c5ceb
Parents: 5e69300
Author: Robert (Bobby) Evans <[email protected]>
Authored: Tue Apr 3 14:58:14 2018 -0500
Committer: Robert (Bobby) Evans <[email protected]>
Committed: Wed Apr 4 15:14:00 2018 -0500

----------------------------------------------------------------------
 .../apache/storm/localizer/AsyncLocalizer.java  | 124 +++++++++----------
 .../storm/localizer/LocalizedResource.java      |   5 +
 .../storm/localizer/LocallyCachedBlob.java      |   5 +-
 3 files changed, 66 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/337aef8f/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 71f3495..1852d94 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -126,48 +126,51 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     @VisibleForTesting
-    LocallyCachedBlob getTopoJar(final String topologyId) throws IOException {
-        String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
-        LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey);
-        if (topoJar == null) {
-            topoJar = new LocallyCachedTopologyBlob(topologyId, isLocalMode, 
conf, fsOps,
-                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR);
-            topologyBlobs.put(topoJarKey, topoJar);
-        }
-        return topoJar;
+    LocallyCachedBlob getTopoJar(final String topologyId) {
+        return 
topologyBlobs.computeIfAbsent(ConfigUtils.masterStormJarKey(topologyId),
+            (tjk) -> {
+                try {
+                    return new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
+                        LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
 
     @VisibleForTesting
-    LocallyCachedBlob getTopoCode(final String topologyId) throws IOException {
-        String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
-        LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey);
-        if (topoCode == null) {
-            topoCode = new LocallyCachedTopologyBlob(topologyId, isLocalMode, 
conf, fsOps,
-                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE);
-            topologyBlobs.put(topoCodeKey, topoCode);
-        }
-        return topoCode;
+    LocallyCachedBlob getTopoCode(final String topologyId) {
+        return 
topologyBlobs.computeIfAbsent(ConfigUtils.masterStormCodeKey(topologyId),
+            (tck) -> {
+                try {
+                    return new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
+                        LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
 
     @VisibleForTesting
-    LocallyCachedBlob getTopoConf(final String topologyId) throws IOException {
-        String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
-        LocallyCachedBlob topoConf = topologyBlobs.get(topoConfKey);
-        if (topoConf == null) {
-            topoConf = new LocallyCachedTopologyBlob(topologyId, isLocalMode, 
conf, fsOps,
-                LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF);
-            topologyBlobs.put(topoConfKey, topoConf);
-        }
-        return topoConf;
+    LocallyCachedBlob getTopoConf(final String topologyId) {
+        return 
topologyBlobs.computeIfAbsent(ConfigUtils.masterStormConfKey(topologyId),
+            (tck) -> {
+                try {
+                    return new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
+                        LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
 
-    private LocalizedResource getUserArchive(String user, String key) throws 
IOException {
+    private LocalizedResource getUserArchive(String user, String key) {
         assert user != null : "All user archives require a user present";
         ConcurrentMap<String, LocalizedResource> keyToResource = 
userArchives.computeIfAbsent(user, (u) -> new ConcurrentHashMap<>());
         return keyToResource.computeIfAbsent(key, (k) -> new 
LocalizedResource(key, localBaseDir, true, fsOps, conf, user));
     }
 
-    private LocalizedResource getUserFile(String user, String key) throws 
IOException {
+    private LocalizedResource getUserFile(String user, String key) {
         assert user != null : "All user archives require a user present";
         ConcurrentMap<String, LocalizedResource> keyToResource = 
userFiles.computeIfAbsent(user, (u) -> new ConcurrentHashMap<>());
         return keyToResource.computeIfAbsent(key, (k) -> new 
LocalizedResource(key, localBaseDir, false, fsOps, conf, user));
@@ -181,50 +184,45 @@ public class AsyncLocalizer implements AutoCloseable {
      * @return a Future that indicates when they are all downloaded.
      * @throws IOException if there was an error while trying doing it.
      */
-    public synchronized CompletableFuture<Void> 
requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port,
+    public CompletableFuture<Void> requestDownloadTopologyBlobs(final 
LocalAssignment assignment, final int port,
                                                                              
final BlobChangingCallback cb) throws IOException {
         final PortAndAssignment pna = new PortAndAssignment(port, assignment);
         final String topologyId = pna.getToplogyId();
 
         CompletableFuture<Void> baseBlobs = 
requestDownloadBaseTopologyBlobs(pna, cb);
-        return baseBlobs.thenComposeAsync((v) -> {
-            CompletableFuture<Void> localResource = 
blobPending.get(topologyId);
-            if (localResource == null) {
-                Supplier<Void> supplier = new DownloadBlobs(pna, cb);
-                localResource = CompletableFuture.supplyAsync(supplier, 
execService);
-                blobPending.put(topologyId, localResource);
-            } else {
-                try {
-                    addReferencesToBlobs(pna, cb);
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
+        return baseBlobs.thenComposeAsync((v) ->
+            blobPending.compute(topologyId, (tid, old) -> {
+                CompletableFuture<Void> ret = old;
+                if (ret == null) {
+                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, 
cb), execService);
+                } else {
+                    try {
+                        addReferencesToBlobs(pna, cb);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
                 }
-            }
-            LOG.debug("Reserved blobs {} {}", topologyId, localResource);
-            return localResource;
-        });
+                LOG.debug("Reserved blobs {} {}", topologyId, ret);
+                return ret;
+            }));
     }
 
     @VisibleForTesting
-    synchronized CompletableFuture<Void> 
requestDownloadBaseTopologyBlobs(PortAndAssignment pna, BlobChangingCallback cb)
+    CompletableFuture<Void> requestDownloadBaseTopologyBlobs(PortAndAssignment 
pna, BlobChangingCallback cb)
         throws IOException {
         final String topologyId = pna.getToplogyId();
 
-        LocallyCachedBlob topoJar = getTopoJar(topologyId);
+        final LocallyCachedBlob topoJar = getTopoJar(topologyId);
         topoJar.addReference(pna, cb);
 
-        LocallyCachedBlob topoCode = getTopoCode(topologyId);
+        final LocallyCachedBlob topoCode = getTopoCode(topologyId);
         topoCode.addReference(pna, cb);
 
-        LocallyCachedBlob topoConf = getTopoConf(topologyId);
+        final LocallyCachedBlob topoConf = getTopoConf(topologyId);
         topoConf.addReference(pna, cb);
 
-        CompletableFuture<Void> ret = topologyBasicDownloaded.get(topologyId);
-        if (ret == null) {
-            ret = downloadOrUpdate(topoJar, topoCode, topoConf);
-            topologyBasicDownloaded.put(topologyId, ret);
-        }
-        return ret;
+        return topologyBasicDownloaded.computeIfAbsent(topologyId,
+            (tid) -> downloadOrUpdate(topoJar, topoCode, topoConf));
     }
 
     private static final int ATTEMPTS_INTERVAL_TIME = 100;
@@ -433,7 +431,7 @@ public class AsyncLocalizer implements AutoCloseable {
      *     indicate that if they change the worker should be restarted.
      * @throws IOException on any error trying to recover the state.
      */
-    public synchronized void recoverRunningTopology(final LocalAssignment 
currentAssignment, final int port,
+    public void recoverRunningTopology(final LocalAssignment 
currentAssignment, final int port,
                                                     final BlobChangingCallback 
cb) throws IOException {
         final PortAndAssignment pna = new PortAndAssignment(port, 
currentAssignment);
         final String topologyId = pna.getToplogyId();
@@ -447,11 +445,7 @@ public class AsyncLocalizer implements AutoCloseable {
         LocallyCachedBlob topoConf = getTopoConf(topologyId);
         topoConf.addReference(pna, cb);
 
-        CompletableFuture<Void> localResource = blobPending.get(topologyId);
-        if (localResource == null) {
-            localResource = ALL_DONE_FUTURE;
-            blobPending.put(topologyId, localResource);
-        }
+        CompletableFuture<Void> localResource = 
blobPending.computeIfAbsent(topologyId, (tid) -> ALL_DONE_FUTURE);
 
         try {
             addReferencesToBlobs(pna, cb);
@@ -469,7 +463,7 @@ public class AsyncLocalizer implements AutoCloseable {
      * @param port the port the topology is running on
      * @throws IOException on any error
      */
-    public synchronized void releaseSlotFor(LocalAssignment assignment, int 
port) throws IOException {
+    public void releaseSlotFor(LocalAssignment assignment, int port) throws 
IOException {
         PortAndAssignment pna = new PortAndAssignment(port, assignment);
         final String topologyId = assignment.get_topology_id();
         LOG.debug("Releasing slot for {} {}", topologyId, port);
@@ -546,7 +540,7 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     // ignores invalid user/topo/key
-    synchronized void removeBlobReference(String key, PortAndAssignment pna,
+    void removeBlobReference(String key, PortAndAssignment pna,
                                           boolean uncompress) throws 
AuthorizationException, KeyNotFoundException {
         String user = pna.getOwner();
         String topo = pna.getToplogyId();
@@ -575,7 +569,7 @@ public class AsyncLocalizer implements AutoCloseable {
      * cache, it downloads them in parallel (up to 
SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
      * and will block until all of them have been downloaded.
      */
-    synchronized List<LocalizedResource> getBlobs(List<LocalResource> 
localResources, PortAndAssignment pna, BlobChangingCallback cb)
+    List<LocalizedResource> getBlobs(List<LocalResource> localResources, 
PortAndAssignment pna, BlobChangingCallback cb)
         throws AuthorizationException, KeyNotFoundException, IOException {
         if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
             throw new KeyNotFoundException("symlinks are disabled so blobs 
cannot be downloaded.");
@@ -632,7 +626,7 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     @VisibleForTesting
-    synchronized void cleanup() {
+    void cleanup() {
         LocalizedResourceRetentionSet toClean = new 
LocalizedResourceRetentionSet(cacheTargetSize);
         // need one large set of all and then clean via LRU
         for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t : 
userArchives.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/337aef8f/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index 1ad582b..75a2d6d 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -451,4 +451,9 @@ public class LocalizedResource extends LocallyCachedBlob {
     public int hashCode() {
         return getKey().hashCode() + Boolean.hashCode(uncompressed) + 
baseDir.hashCode();
     }
+
+    @Override
+    public String toString() {
+        return this.user + ":" + getKey();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/337aef8f/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 1f7ee00..1cbb221 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
@@ -197,7 +196,7 @@ public abstract class LocallyCachedBlob {
      * @param pna the slot and assignment that are using this blob.
      * @param cb an optional callback indicating that they want to 
know/synchronize when a blob is updated.
      */
-    public void addReference(final PortAndAssignment pna, BlobChangingCallback 
cb) {
+    public synchronized void addReference(final PortAndAssignment pna, 
BlobChangingCallback cb) {
         if (cb == null) {
             cb = NOOP_CB;
         }
@@ -210,7 +209,7 @@ public abstract class LocallyCachedBlob {
      * Removes a reservation for this blob from a given slot and assignemnt.
      * @param pna the slot + assignment that no longer needs this blob.
      */
-    public void removeReference(final PortAndAssignment pna) {
+    public synchronized void removeReference(final PortAndAssignment pna) {
         if (references.remove(pna) == null) {
             LOG.warn("{} had no reservation for {}", pna, blobDescription);
         }

Reply via email to