Github user danny0405 commented on the issue:
https://github.com/apache/storm/pull/2618
@revans2
To me the race condition has none of the business of AsyncLocalize#
`requestDownloadBaseTopologyBlobs `, it's the race condition on
AsyncLocalize#`topologyBlobs` of timer task AsyncLocalize#`cleanup` and
AsyncLocalize#`updateBlobs`.
From my cases the exception is throw from this code segment:
```java
private CompletableFuture<Void> downloadOrUpdate(Collection<? extends
LocallyCachedBlob> blobs) {
CompletableFuture<Void> [] all = new
CompletableFuture[blobs.size()];
int i = 0;
for (final LocallyCachedBlob blob: blobs) {
all[i] = CompletableFuture.runAsync(() -> {
LOG.debug("STARTING download of {}", blob);
try (ClientBlobStore blobStore = getClientBlobStore()) {
boolean done = false;
long failures = 0;
while (!done) {
try {
synchronized (blob) {
long localVersion = blob.getLocalVersion();
// KeyNotFoundException is thrown here
first, then it will goes to
long remoteVersion =
blob.getRemoteVersion(blobStore);
if (localVersion != remoteVersion ||
!blob.isFullyDownloaded()) {
try {
long newVersion =
blob.downloadToTempLocation(blobStore);
blob.informAllOfChangeAndWaitForConsensus();
blob.commitNewVersion(newVersion);
blob.informAllChangeComplete();
} finally {
blob.cleanupOrphanedData();
}
}
}
done = true;
} catch (KeyNotFoundException kne) {
//no blob exist on the cluster, cancel
downloading.
done = true;
} catch (Exception e) {
failures++;
if (failures > blobDownloadRetries) {
throw new RuntimeException("Could not
download...", e);
}
LOG.warn("Failed to download blob {} will try
again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e);
Utils.sleep(ATTEMPTS_INTERVAL_TIME);
}
}
}
LOG.debug("FINISHED download of {}", blob);
}, execService);
i++;
}
return CompletableFuture.allOf(all);
}
```
When AsyncLocalizer start, it will start the two timer task:
```java
execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30,
TimeUnit.SECONDS);
LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod,
cacheCleanupPeriod, TimeUnit.MILLISECONDS);
```
This bug happens because cleanup did after updateBlobs for killed storms
overdue keys, so updateBlobs will still get a KeyNotFoundException.
Although scheduleAtFixedDelay is event trig model and only one task will
exec at a time, we still can not make sure which can exec first for this two
timer task because the time cost of these two tasks are different.
---