This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d3015df41894773f477eed1fd2a42857515355f2 Author: Zakelly <[email protected]> AuthorDate: Sun Mar 9 15:10:51 2025 +0800 [FLINK-37437][state/forst] Avoid the deletion of primary directory in ForSt --- .../state/forst/ForStKeyedStateBackendBuilder.java | 3 +++ .../flink/state/forst/ForStResourceContainer.java | 6 +++++ .../forst/fs/filemapping/FileMappingManager.java | 8 +++++- .../state/forst/ForStResourceContainerTest.java | 3 +++ .../state/forst/fs/FileMappingManagerTest.java | 30 ++++++++++++---------- 5 files changed, 35 insertions(+), 15 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 25649fcef7f..74a128c4dfd 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -298,6 +298,9 @@ public class ForStKeyedStateBackendBuilder<K> IOUtils.closeQuietly(restoreOperation); try { optionsContainer.clearDirectories(); + // TODO: Remove this after FLINK-37442, if we could properly handl the directory + // deletion in file mapping manager. + optionsContainer.forceClearRemoteDirectories(); } catch (Exception ex) { logger.warn( "Failed to delete ForSt local base path {}, remote base path {}.", diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index 5011678fcbd..18bf57d842a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -435,6 +435,12 @@ public final class ForStResourceContainer implements AutoCloseable { } } + public void forceClearRemoteDirectories() throws Exception { + if (remoteBasePath != null) { + clearDirectories(remoteBasePath); + } + } + private static void clearDirectories(Path basePath) throws IOException { FileSystem fileSystem = basePath.getFileSystem(); if (fileSystem.exists(basePath)) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java index db36faa823e..963645a10e0 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java @@ -276,9 +276,15 @@ public class FileMappingManager { } } + // We always treat parentEntry not owned for now, to avoid deleting directory. + // This is a safety guard but no good reason to keep it if we have a better solution. + // TODO: Reconsider the directory deletion strategy in FLINK-37442. + parentEntry.setFileOwnership(FileOwnership.NOT_OWNED); + boolean status = true; // step 2.2: release file under directory - if (parentEntry.getReferenceCount() == 0) { + if (parentEntry.getReferenceCount() == 0 + && parentEntry.getFileOwnership() != FileOwnership.NOT_OWNED) { // an empty directory status = fileSystem.delete(file, recursive); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java index 3a33a645a97..5df2cb42fda 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java @@ -333,6 +333,9 @@ public class ForStResourceContainerTest { optionsContainer.clearDirectories(); assertFalse(new File(localBasePath.getPath()).exists()); + + assertTrue(new File(remoteBasePath.getPath()).exists()); + optionsContainer.forceClearRemoteDirectories(); assertFalse(new File(remoteBasePath.getPath()).exists()); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java index 84fe596f4aa..662f006a26b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java @@ -160,8 +160,9 @@ public class FileMappingManagerTest { // delete dst fileMappingManager.deleteFileOrDirectory(new Path(dst), false); - assertThat(localFS.exists(new Path(src))).isFalse(); - assertThat(localFS.exists(new Path(testDir))).isFalse(); + // TODO FLINK-37442: properly clear directory + assertThat(localFS.exists(new Path(src))).isTrue(); + assertThat(localFS.exists(new Path(testDir))).isTrue(); } @TestTemplate @@ -191,7 +192,7 @@ public class FileMappingManagerTest { // delete src assertThat(fileMappingManager.deleteFileOrDirectory(new Path(src), false)).isEqualTo(true); assertThat(localFS.exists(new Path(testDir))).isTrue(); - assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); assertThat(localFS.exists(new Path(linkedDir))).isTrue(); assertThat(localFS.exists(new Path(src))).isTrue(); @@ -204,16 +205,17 @@ public class FileMappingManagerTest { // delete linkedSrc assertThat(fileMappingManager.deleteFileOrDirectory(new Path(linkedSrc), false)) .isEqualTo(true); - assertThat(localFS.exists(new Path(src))).isFalse(); - assertThat(localFS.exists(new Path(testDir))).isFalse(); + // file from cp cannot be deleted + assertThat(localFS.exists(new Path(src))).isTrue(); + assertThat(localFS.exists(new Path(testDir))).isTrue(); // delete linkedDir assertThat(fileMappingManager.deleteFileOrDirectory(new Path(linkedDir), true)) .isEqualTo(true); - assertThat(localFS.exists(new Path(testDir))).isFalse(); - assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse(); - assertThat(localFS.exists(new Path(linkedDir))).isFalse(); - assertThat(localFS.exists(new Path(src))).isFalse(); + assertThat(localFS.exists(new Path(testDir))).isTrue(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); + assertThat(localFS.exists(new Path(linkedDir))).isTrue(); + assertThat(localFS.exists(new Path(src))).isTrue(); } @TestTemplate @@ -267,8 +269,8 @@ public class FileMappingManagerTest { // delete linkedSrc assertThat(fileMappingManager.deleteFileOrDirectory(new Path(linkedSrc), false)) .isEqualTo(true); - assertThat(localFS.exists(new Path(src))).isFalse(); - assertThat(localFS.exists(new Path(testDir))).isFalse(); + assertThat(localFS.exists(new Path(src))).isTrue(); + assertThat(localFS.exists(new Path(testDir))).isTrue(); assertThat(localFS.exists(new Path(linkedDir))).isTrue(); assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); @@ -281,13 +283,13 @@ public class FileMappingManagerTest { if (reuseCp) { assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); } else { - assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse(); + assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue(); } - assertThat(localFS.exists(new Path(testDir))).isFalse(); + assertThat(localFS.exists(new Path(testDir))).isTrue(); // delete linkedDir assertThat(fileMappingManager.deleteFileOrDirectory(new Path(linkedDir), true)) .isEqualTo(true); - assertThat(localFS.exists(new Path(testDir))).isFalse(); + assertThat(localFS.exists(new Path(testDir))).isTrue(); } }
