agresch commented on a change in pull request #3374:
URL: https://github.com/apache/storm/pull/3374#discussion_r568705392



##########
File path: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
##########
@@ -440,9 +442,23 @@ public void recoverRunningTopology(final LocalAssignment 
currentAssignment, fina
      *
      * @param assignment the assignment the resources are for
      * @param port       the port the topology is running on
+     * @param downloadingBlobs any existing downloading blob futures
      * @throws IOException on any error
      */
-    public void releaseSlotFor(LocalAssignment assignment, int port) throws 
IOException {
+    public void releaseSlotFor(LocalAssignment assignment, int port,
+                               Collection<Future<Void>> downloadingBlobs) 
throws IOException {
+
+        // Make sure any downloading blobs in the background are stopped.
+        // This prevents a race condition where we could be adding references 
to a
+        // delayed downloading blob after the slot gets released, causing 
orphaned blobs.
+        for (Future future : downloadingBlobs) {
+            if (!future.isDone()) {
+                LOG.info("Canceling download of {}", future);
+                future.cancel(true);
+            }
+        }
+        downloadingBlobs.clear();

Review comment:
       I agree drainAllChangingBlobs() should also clean this up.  I will look 
into this.  Good find.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to