XComp commented on a change in pull request #18963:
URL: https://github.com/apache/flink/pull/18963#discussion_r819363512



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
##########
@@ -74,15 +74,21 @@ public FSDataInputStream openInputStream() throws 
IOException {
     }
 
     /**
-     * Discard the state by deleting the file that stores the state. If the 
parent directory of the
-     * state is empty after deleting the state file, it is also deleted.
+     * Discard the state by deleting the file that stores the state.
      *
      * @throws Exception Thrown, if the file deletion (not the directory 
deletion) fails.
      */
     @Override
     public void discardState() throws Exception {
-        FileSystem fs = getFileSystem();
-        fs.delete(filePath, false);
+        final FileSystem fs = getFileSystem();
+        if (!fs.exists(filePath)) {
+            return;
+        }
+
+        if (!fs.delete(filePath, false)) {
+            throw new IOException(
+                    filePath.getPath() + " could not be deleted for unknown 
reasons.");
+        }

Review comment:
       That's a good point - I should have considered that. The deletion needs 
to be atomic. I will consider that. Thanks for the hint




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to