Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2618
  
    @revans2
    
    To me the race condition has none of the business of AsyncLocalize# 
`requestDownloadBaseTopologyBlobs `, it's the race condition on 
AsyncLocalize#`topologyBlobs` of timer task AsyncLocalize#`cleanup` and 
AsyncLocalize#`updateBlobs`.
    
    From my cases the exception is throw from this code segment:
    ```java
    private CompletableFuture<Void> downloadOrUpdate(Collection<? extends 
LocallyCachedBlob> blobs) {
            CompletableFuture<Void> [] all = new 
CompletableFuture[blobs.size()];
            int i = 0;
            for (final LocallyCachedBlob blob: blobs) {
                all[i] = CompletableFuture.runAsync(() -> {
                    LOG.debug("STARTING download of {}", blob);
                    try (ClientBlobStore blobStore = getClientBlobStore()) {
                        boolean done = false;
                        long failures = 0;
                        while (!done) {
                            try {
                                synchronized (blob) {
                                    long localVersion = blob.getLocalVersion();
                                    // KeyNotFoundException is thrown here 
first, then it will goes to 
                                    long remoteVersion = 
blob.getRemoteVersion(blobStore);
                                    if (localVersion != remoteVersion || 
!blob.isFullyDownloaded()) {
                                        try {
                                            long newVersion = 
blob.downloadToTempLocation(blobStore);
                                            
blob.informAllOfChangeAndWaitForConsensus();
                                            blob.commitNewVersion(newVersion);
                                            blob.informAllChangeComplete();
                                        } finally {
                                            blob.cleanupOrphanedData();
                                        }
                                    }
                                }
                                done = true;
                            } catch (KeyNotFoundException kne) {
                              //no blob exist on the cluster, cancel 
downloading.
                                done = true;
                            } catch (Exception e) {
                                failures++;
                                if (failures > blobDownloadRetries) {
                                    throw new RuntimeException("Could not 
download...", e);
                                }
                                LOG.warn("Failed to download blob {} will try 
again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e);
                                Utils.sleep(ATTEMPTS_INTERVAL_TIME);
                            }
                        }
                    }
                    LOG.debug("FINISHED download of {}", blob);
                }, execService);
                i++;
            }
            return CompletableFuture.allOf(all);
        }
    ```
    
    When AsyncLocalizer start, it will start the two timer task:
    ```java
    execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, 
TimeUnit.SECONDS);
            LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
            execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, 
cacheCleanupPeriod, TimeUnit.MILLISECONDS);
    ```
    This bug happens because cleanup did after updateBlobs for killed storms 
overdue keys, so updateBlobs will still get a KeyNotFoundException.
    
    Although scheduleAtFixedDelay is event trig model and only one task will 
exec at a time, we still can not make sure which can exec first for this two 
timer task because the time cost of these two tasks are different.


---

Reply via email to