Repository: storm Updated Branches: refs/heads/master a8d84d671 -> b4e610c5f
STORM-3020: fix possible race condition in AsyncLocalizer Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/337aef8f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/337aef8f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/337aef8f Branch: refs/heads/master Commit: 337aef8f1291aba0ab228f6e9e0800c19b8c5ceb Parents: 5e69300 Author: Robert (Bobby) Evans <[email protected]> Authored: Tue Apr 3 14:58:14 2018 -0500 Committer: Robert (Bobby) Evans <[email protected]> Committed: Wed Apr 4 15:14:00 2018 -0500 ---------------------------------------------------------------------- .../apache/storm/localizer/AsyncLocalizer.java | 124 +++++++++---------- .../storm/localizer/LocalizedResource.java | 5 + .../storm/localizer/LocallyCachedBlob.java | 5 +- 3 files changed, 66 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/337aef8f/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java index 71f3495..1852d94 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java @@ -126,48 +126,51 @@ public class AsyncLocalizer implements AutoCloseable { } @VisibleForTesting - LocallyCachedBlob getTopoJar(final String topologyId) throws IOException { - String topoJarKey = ConfigUtils.masterStormJarKey(topologyId); - LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey); - if (topoJar == null) { - topoJar = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, - LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR); - topologyBlobs.put(topoJarKey, topoJar); - } - return topoJar; + LocallyCachedBlob getTopoJar(final String topologyId) { + return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormJarKey(topologyId), + (tjk) -> { + try { + return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @VisibleForTesting - LocallyCachedBlob getTopoCode(final String topologyId) throws IOException { - String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId); - LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey); - if (topoCode == null) { - topoCode = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, - LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE); - topologyBlobs.put(topoCodeKey, topoCode); - } - return topoCode; + LocallyCachedBlob getTopoCode(final String topologyId) { + return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormCodeKey(topologyId), + (tck) -> { + try { + return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @VisibleForTesting - LocallyCachedBlob getTopoConf(final String topologyId) throws IOException { - String topoConfKey = ConfigUtils.masterStormConfKey(topologyId); - LocallyCachedBlob topoConf = topologyBlobs.get(topoConfKey); - if (topoConf == null) { - topoConf = new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, - LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF); - topologyBlobs.put(topoConfKey, topoConf); - } - return topoConf; + LocallyCachedBlob getTopoConf(final String topologyId) { + return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormConfKey(topologyId), + (tck) -> { + try { + return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } - private LocalizedResource getUserArchive(String user, String key) throws IOException { + private LocalizedResource getUserArchive(String user, String key) { assert user != null : "All user archives require a user present"; ConcurrentMap<String, LocalizedResource> keyToResource = userArchives.computeIfAbsent(user, (u) -> new ConcurrentHashMap<>()); return keyToResource.computeIfAbsent(key, (k) -> new LocalizedResource(key, localBaseDir, true, fsOps, conf, user)); } - private LocalizedResource getUserFile(String user, String key) throws IOException { + private LocalizedResource getUserFile(String user, String key) { assert user != null : "All user archives require a user present"; ConcurrentMap<String, LocalizedResource> keyToResource = userFiles.computeIfAbsent(user, (u) -> new ConcurrentHashMap<>()); return keyToResource.computeIfAbsent(key, (k) -> new LocalizedResource(key, localBaseDir, false, fsOps, conf, user)); @@ -181,50 +184,45 @@ public class AsyncLocalizer implements AutoCloseable { * @return a Future that indicates when they are all downloaded. * @throws IOException if there was an error while trying doing it. */ - public synchronized CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port, + public CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port, final BlobChangingCallback cb) throws IOException { final PortAndAssignment pna = new PortAndAssignment(port, assignment); final String topologyId = pna.getToplogyId(); CompletableFuture<Void> baseBlobs = requestDownloadBaseTopologyBlobs(pna, cb); - return baseBlobs.thenComposeAsync((v) -> { - CompletableFuture<Void> localResource = blobPending.get(topologyId); - if (localResource == null) { - Supplier<Void> supplier = new DownloadBlobs(pna, cb); - localResource = CompletableFuture.supplyAsync(supplier, execService); - blobPending.put(topologyId, localResource); - } else { - try { - addReferencesToBlobs(pna, cb); - } catch (Exception e) { - throw new RuntimeException(e); + return baseBlobs.thenComposeAsync((v) -> + blobPending.compute(topologyId, (tid, old) -> { + CompletableFuture<Void> ret = old; + if (ret == null) { + ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService); + } else { + try { + addReferencesToBlobs(pna, cb); + } catch (Exception e) { + throw new RuntimeException(e); + } } - } - LOG.debug("Reserved blobs {} {}", topologyId, localResource); - return localResource; - }); + LOG.debug("Reserved blobs {} {}", topologyId, ret); + return ret; + })); } @VisibleForTesting - synchronized CompletableFuture<Void> requestDownloadBaseTopologyBlobs(PortAndAssignment pna, BlobChangingCallback cb) + CompletableFuture<Void> requestDownloadBaseTopologyBlobs(PortAndAssignment pna, BlobChangingCallback cb) throws IOException { final String topologyId = pna.getToplogyId(); - LocallyCachedBlob topoJar = getTopoJar(topologyId); + final LocallyCachedBlob topoJar = getTopoJar(topologyId); topoJar.addReference(pna, cb); - LocallyCachedBlob topoCode = getTopoCode(topologyId); + final LocallyCachedBlob topoCode = getTopoCode(topologyId); topoCode.addReference(pna, cb); - LocallyCachedBlob topoConf = getTopoConf(topologyId); + final LocallyCachedBlob topoConf = getTopoConf(topologyId); topoConf.addReference(pna, cb); - CompletableFuture<Void> ret = topologyBasicDownloaded.get(topologyId); - if (ret == null) { - ret = downloadOrUpdate(topoJar, topoCode, topoConf); - topologyBasicDownloaded.put(topologyId, ret); - } - return ret; + return topologyBasicDownloaded.computeIfAbsent(topologyId, + (tid) -> downloadOrUpdate(topoJar, topoCode, topoConf)); } private static final int ATTEMPTS_INTERVAL_TIME = 100; @@ -433,7 +431,7 @@ public class AsyncLocalizer implements AutoCloseable { * indicate that if they change the worker should be restarted. * @throws IOException on any error trying to recover the state. */ - public synchronized void recoverRunningTopology(final LocalAssignment currentAssignment, final int port, + public void recoverRunningTopology(final LocalAssignment currentAssignment, final int port, final BlobChangingCallback cb) throws IOException { final PortAndAssignment pna = new PortAndAssignment(port, currentAssignment); final String topologyId = pna.getToplogyId(); @@ -447,11 +445,7 @@ public class AsyncLocalizer implements AutoCloseable { LocallyCachedBlob topoConf = getTopoConf(topologyId); topoConf.addReference(pna, cb); - CompletableFuture<Void> localResource = blobPending.get(topologyId); - if (localResource == null) { - localResource = ALL_DONE_FUTURE; - blobPending.put(topologyId, localResource); - } + CompletableFuture<Void> localResource = blobPending.computeIfAbsent(topologyId, (tid) -> ALL_DONE_FUTURE); try { addReferencesToBlobs(pna, cb); @@ -469,7 +463,7 @@ public class AsyncLocalizer implements AutoCloseable { * @param port the port the topology is running on * @throws IOException on any error */ - public synchronized void releaseSlotFor(LocalAssignment assignment, int port) throws IOException { + public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException { PortAndAssignment pna = new PortAndAssignment(port, assignment); final String topologyId = assignment.get_topology_id(); LOG.debug("Releasing slot for {} {}", topologyId, port); @@ -546,7 +540,7 @@ public class AsyncLocalizer implements AutoCloseable { } // ignores invalid user/topo/key - synchronized void removeBlobReference(String key, PortAndAssignment pna, + void removeBlobReference(String key, PortAndAssignment pna, boolean uncompress) throws AuthorizationException, KeyNotFoundException { String user = pna.getOwner(); String topo = pna.getToplogyId(); @@ -575,7 +569,7 @@ public class AsyncLocalizer implements AutoCloseable { * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT) * and will block until all of them have been downloaded. */ - synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources, PortAndAssignment pna, BlobChangingCallback cb) + List<LocalizedResource> getBlobs(List<LocalResource> localResources, PortAndAssignment pna, BlobChangingCallback cb) throws AuthorizationException, KeyNotFoundException, IOException { if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) { throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded."); @@ -632,7 +626,7 @@ public class AsyncLocalizer implements AutoCloseable { } @VisibleForTesting - synchronized void cleanup() { + void cleanup() { LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize); // need one large set of all and then clean via LRU for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t : userArchives.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/337aef8f/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java index 1ad582b..75a2d6d 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java @@ -451,4 +451,9 @@ public class LocalizedResource extends LocallyCachedBlob { public int hashCode() { return getKey().hashCode() + Boolean.hashCode(uncompressed) + baseDir.hashCode(); } + + @Override + public String toString() { + return this.user + ":" + getKey(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/337aef8f/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java index 1f7ee00..1cbb221 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.daemon.supervisor.IAdvancedFSOps; @@ -197,7 +196,7 @@ public abstract class LocallyCachedBlob { * @param pna the slot and assignment that are using this blob. * @param cb an optional callback indicating that they want to know/synchronize when a blob is updated. */ - public void addReference(final PortAndAssignment pna, BlobChangingCallback cb) { + public synchronized void addReference(final PortAndAssignment pna, BlobChangingCallback cb) { if (cb == null) { cb = NOOP_CB; } @@ -210,7 +209,7 @@ public abstract class LocallyCachedBlob { * Removes a reservation for this blob from a given slot and assignemnt. * @param pna the slot + assignment that no longer needs this blob. */ - public void removeReference(final PortAndAssignment pna) { + public synchronized void removeReference(final PortAndAssignment pna) { if (references.remove(pna) == null) { LOG.warn("{} had no reservation for {}", pna, blobDescription); }
