StefanRRichter commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1234916685


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##########
@@ -90,27 +80,62 @@ private void downloadDataForAllStateHandles(
             } else {
                 throw new FlinkRuntimeException("Failed to download data for 
state handles.", e);
             }
+        } finally {
+            // Unregister and close the internal closer. In a failure case, 
this should interrupt
+            // ongoing downloads.
+            closeableRegistry.unregisterCloseable(internalCloser);
+            IOUtils.closeQuietly(internalCloser);
+            if (failureCleanupRequired) {
+                // Cleanup on exception: cancel all tasks and delete the 
created directories
+                futures.forEach(future -> future.cancel(true));
+                downloadRequests.stream()
+                        .map(StateHandleDownloadSpec::getDownloadDestination)
+                        .map(Path::toFile)
+                        .forEach(FileUtils::deleteDirectoryQuietly);
+            }

Review Comment:
   Two reasons:
   - the code was written to catch only execution exception, but I want to make 
sure the cleanup happens also for any other exception.
   - For cleanup, I first want to close the registry to interrupt the IO in all 
parallel tasks, but I also want to close the registry in the finally block, so 
it would have slightly more code duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to