This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d3015df41894773f477eed1fd2a42857515355f2
Author: Zakelly <[email protected]>
AuthorDate: Sun Mar 9 15:10:51 2025 +0800

    [FLINK-37437][state/forst] Avoid the deletion of primary directory in ForSt
---
 .../state/forst/ForStKeyedStateBackendBuilder.java |  3 +++
 .../flink/state/forst/ForStResourceContainer.java  |  6 +++++
 .../forst/fs/filemapping/FileMappingManager.java   |  8 +++++-
 .../state/forst/ForStResourceContainerTest.java    |  3 +++
 .../state/forst/fs/FileMappingManagerTest.java     | 30 ++++++++++++----------
 5 files changed, 35 insertions(+), 15 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 25649fcef7f..74a128c4dfd 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -298,6 +298,9 @@ public class ForStKeyedStateBackendBuilder<K>
             IOUtils.closeQuietly(restoreOperation);
             try {
                 optionsContainer.clearDirectories();
+                // TODO: Remove this after FLINK-37442, if we could properly 
handl the directory
+                //       deletion in file mapping manager.
+                optionsContainer.forceClearRemoteDirectories();
             } catch (Exception ex) {
                 logger.warn(
                         "Failed to delete ForSt local base path {}, remote 
base path {}.",
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index 5011678fcbd..18bf57d842a 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -435,6 +435,12 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         }
     }
 
+    public void forceClearRemoteDirectories() throws Exception {
+        if (remoteBasePath != null) {
+            clearDirectories(remoteBasePath);
+        }
+    }
+
     private static void clearDirectories(Path basePath) throws IOException {
         FileSystem fileSystem = basePath.getFileSystem();
         if (fileSystem.exists(basePath)) {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
index db36faa823e..963645a10e0 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
@@ -276,9 +276,15 @@ public class FileMappingManager {
             }
         }
 
+        // We always treat parentEntry not owned for now, to avoid deleting 
directory.
+        // This is a safety guard but no good reason to keep it if we have a 
better solution.
+        // TODO: Reconsider the directory deletion strategy in FLINK-37442.
+        parentEntry.setFileOwnership(FileOwnership.NOT_OWNED);
+
         boolean status = true;
         // step 2.2: release file under directory
-        if (parentEntry.getReferenceCount() == 0) {
+        if (parentEntry.getReferenceCount() == 0
+                && parentEntry.getFileOwnership() != FileOwnership.NOT_OWNED) {
             // an empty directory
             status = fileSystem.delete(file, recursive);
         }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
index 3a33a645a97..5df2cb42fda 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
@@ -333,6 +333,9 @@ public class ForStResourceContainerTest {
 
             optionsContainer.clearDirectories();
             assertFalse(new File(localBasePath.getPath()).exists());
+
+            assertTrue(new File(remoteBasePath.getPath()).exists());
+            optionsContainer.forceClearRemoteDirectories();
             assertFalse(new File(remoteBasePath.getPath()).exists());
         }
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
index 84fe596f4aa..662f006a26b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
@@ -160,8 +160,9 @@ public class FileMappingManagerTest {
 
         // delete dst
         fileMappingManager.deleteFileOrDirectory(new Path(dst), false);
-        assertThat(localFS.exists(new Path(src))).isFalse();
-        assertThat(localFS.exists(new Path(testDir))).isFalse();
+        // TODO FLINK-37442: properly clear directory
+        assertThat(localFS.exists(new Path(src))).isTrue();
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
     }
 
     @TestTemplate
@@ -191,7 +192,7 @@ public class FileMappingManagerTest {
         // delete src
         assertThat(fileMappingManager.deleteFileOrDirectory(new Path(src), 
false)).isEqualTo(true);
         assertThat(localFS.exists(new Path(testDir))).isTrue();
-        assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
         assertThat(localFS.exists(new Path(linkedDir))).isTrue();
         assertThat(localFS.exists(new Path(src))).isTrue();
 
@@ -204,16 +205,17 @@ public class FileMappingManagerTest {
         // delete linkedSrc
         assertThat(fileMappingManager.deleteFileOrDirectory(new 
Path(linkedSrc), false))
                 .isEqualTo(true);
-        assertThat(localFS.exists(new Path(src))).isFalse();
-        assertThat(localFS.exists(new Path(testDir))).isFalse();
+        // file from cp cannot be deleted
+        assertThat(localFS.exists(new Path(src))).isTrue();
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
 
         // delete linkedDir
         assertThat(fileMappingManager.deleteFileOrDirectory(new 
Path(linkedDir), true))
                 .isEqualTo(true);
-        assertThat(localFS.exists(new Path(testDir))).isFalse();
-        assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse();
-        assertThat(localFS.exists(new Path(linkedDir))).isFalse();
-        assertThat(localFS.exists(new Path(src))).isFalse();
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDir))).isTrue();
+        assertThat(localFS.exists(new Path(src))).isTrue();
     }
 
     @TestTemplate
@@ -267,8 +269,8 @@ public class FileMappingManagerTest {
         // delete linkedSrc
         assertThat(fileMappingManager.deleteFileOrDirectory(new 
Path(linkedSrc), false))
                 .isEqualTo(true);
-        assertThat(localFS.exists(new Path(src))).isFalse();
-        assertThat(localFS.exists(new Path(testDir))).isFalse();
+        assertThat(localFS.exists(new Path(src))).isTrue();
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
         assertThat(localFS.exists(new Path(linkedDir))).isTrue();
         assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
 
@@ -281,13 +283,13 @@ public class FileMappingManagerTest {
         if (reuseCp) {
             assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
         } else {
-            assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse();
+            assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
         }
-        assertThat(localFS.exists(new Path(testDir))).isFalse();
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
 
         // delete linkedDir
         assertThat(fileMappingManager.deleteFileOrDirectory(new 
Path(linkedDir), true))
                 .isEqualTo(true);
-        assertThat(localFS.exists(new Path(testDir))).isFalse();
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
     }
 }

Reply via email to