This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new bf45e3762f1 [FLINK-38433][state/forst] Avoid delete ForSt's directory 
when there happened to be an existing one (#27064)
bf45e3762f1 is described below

commit bf45e3762f189e04285e96cfe5ff0c1e2691ebd0
Author: AlexYinHan <[email protected]>
AuthorDate: Mon Sep 29 18:04:47 2025 +0800

    [FLINK-38433][state/forst] Avoid delete ForSt's directory when there 
happened to be an existing one (#27064)
    
    (cherry picked from commit a644a8243a5dc8ff7790377d90ccfb58ea39a513)
---
 .../flink/state/forst/ForStResourceContainer.java       | 17 +++++++++++------
 .../flink/state/forst/ForStResourceContainerTest.java   |  4 +++-
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index 35ef3ba0982..d4c5a7cc831 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -86,6 +86,8 @@ public final class ForStResourceContainer implements 
AutoCloseable {
 
     @Nullable private final Path remoteForStPath;
 
+    private boolean remotePathNewlyCreated;
+
     @Nullable private final Path localBasePath;
 
     @Nullable private final Path localForStPath;
@@ -369,7 +371,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
      */
     public void prepareDirectories() throws Exception {
         if (remoteBasePath != null && remoteForStPath != null) {
-            prepareDirectories(remoteBasePath, remoteForStPath);
+            remotePathNewlyCreated = prepareDirectories(remoteBasePath, 
remoteForStPath);
         }
         if (localBasePath != null && localForStPath != null) {
             prepareDirectories(
@@ -402,23 +404,26 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         return forStFileSystem;
     }
 
-    private static void prepareDirectories(Path basePath, Path dbPath) throws 
IOException {
+    private static boolean prepareDirectories(Path basePath, Path dbPath) 
throws IOException {
+        boolean allNewlyCreated = true;
         FileSystem fileSystem = basePath.getFileSystem();
         if (fileSystem.exists(basePath)) {
             if (!fileSystem.getFileStatus(basePath).isDir()) {
                 throw new IOException("Not a directory: " + basePath);
             }
+            allNewlyCreated = false;
         } else if (!fileSystem.mkdirs(basePath)) {
             throw new IOException(
                     String.format("Could not create ForSt directory at %s.", 
basePath));
         }
         if (fileSystem.exists(dbPath)) {
-            fileSystem.delete(dbPath, true);
-        }
-        if (!fileSystem.mkdirs(dbPath)) {
+            LOG.info("Reusing previous ForSt db directory at {}.", dbPath);
+            allNewlyCreated = false;
+        } else if (!fileSystem.mkdirs(dbPath)) {
             throw new IOException(
                     String.format("Could not create ForSt db directory at 
%s.", dbPath));
         }
+        return allNewlyCreated;
     }
 
     /**
@@ -436,7 +441,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
     }
 
     public void forceClearRemoteDirectories() throws Exception {
-        if (remoteBasePath != null) {
+        if (remoteBasePath != null && remotePathNewlyCreated) {
             clearDirectories(remoteBasePath);
         }
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
index 5df2cb42fda..fd9ae8ea111 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
@@ -336,7 +336,9 @@ public class ForStResourceContainerTest {
 
             assertTrue(new File(remoteBasePath.getPath()).exists());
             optionsContainer.forceClearRemoteDirectories();
-            assertFalse(new File(remoteBasePath.getPath()).exists());
+
+            // Do not delete remote directory because it is not created by 
ForStResourceContainer
+            assertTrue(new File(remoteBasePath.getPath()).exists());
         }
     }
 

Reply via email to