StefanRRichter commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1235432295


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##########
@@ -90,27 +80,62 @@ private void downloadDataForAllStateHandles(
             } else {
                 throw new FlinkRuntimeException("Failed to download data for 
state handles.", e);
             }
+        } finally {
+            // Unregister and close the internal closer. In a failure case, 
this should interrupt
+            // ongoing downloads.
+            closeableRegistry.unregisterCloseable(internalCloser);
+            IOUtils.closeQuietly(internalCloser);
+            if (failureCleanupRequired) {
+                // Cleanup on exception: cancel all tasks and delete the 
created directories
+                futures.forEach(future -> future.cancel(true));
+                downloadRequests.stream()
+                        .map(StateHandleDownloadSpec::getDownloadDestination)
+                        .map(Path::toFile)
+                        .forEach(FileUtils::deleteDirectoryQuietly);
+            }

Review Comment:
   About 1, we'd have to ask the original author, but the code calls 
`ExceptionUtils.stripExecutionException(e);` , so it's expecting to work on 
execution exceptions. Not that it couldn't be changed...
   
   And for 2, yes that can be done, but is it actually better or more readable? 
I think we are debating about personal taste here, except if you have a point 
while one should be better than the other way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to