This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push:
new bf45e3762f1 [FLINK-38433][state/forst] Avoid delete ForSt's directory
when there happened to be an existing one (#27064)
bf45e3762f1 is described below
commit bf45e3762f189e04285e96cfe5ff0c1e2691ebd0
Author: AlexYinHan <[email protected]>
AuthorDate: Mon Sep 29 18:04:47 2025 +0800
[FLINK-38433][state/forst] Avoid delete ForSt's directory when there
happened to be an existing one (#27064)
(cherry picked from commit a644a8243a5dc8ff7790377d90ccfb58ea39a513)
---
.../flink/state/forst/ForStResourceContainer.java | 17 +++++++++++------
.../flink/state/forst/ForStResourceContainerTest.java | 4 +++-
2 files changed, 14 insertions(+), 7 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index 35ef3ba0982..d4c5a7cc831 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -86,6 +86,8 @@ public final class ForStResourceContainer implements
AutoCloseable {
@Nullable private final Path remoteForStPath;
+ private boolean remotePathNewlyCreated;
+
@Nullable private final Path localBasePath;
@Nullable private final Path localForStPath;
@@ -369,7 +371,7 @@ public final class ForStResourceContainer implements
AutoCloseable {
*/
public void prepareDirectories() throws Exception {
if (remoteBasePath != null && remoteForStPath != null) {
- prepareDirectories(remoteBasePath, remoteForStPath);
+ remotePathNewlyCreated = prepareDirectories(remoteBasePath,
remoteForStPath);
}
if (localBasePath != null && localForStPath != null) {
prepareDirectories(
@@ -402,23 +404,26 @@ public final class ForStResourceContainer implements
AutoCloseable {
return forStFileSystem;
}
- private static void prepareDirectories(Path basePath, Path dbPath) throws
IOException {
+ private static boolean prepareDirectories(Path basePath, Path dbPath)
throws IOException {
+ boolean allNewlyCreated = true;
FileSystem fileSystem = basePath.getFileSystem();
if (fileSystem.exists(basePath)) {
if (!fileSystem.getFileStatus(basePath).isDir()) {
throw new IOException("Not a directory: " + basePath);
}
+ allNewlyCreated = false;
} else if (!fileSystem.mkdirs(basePath)) {
throw new IOException(
String.format("Could not create ForSt directory at %s.",
basePath));
}
if (fileSystem.exists(dbPath)) {
- fileSystem.delete(dbPath, true);
- }
- if (!fileSystem.mkdirs(dbPath)) {
+ LOG.info("Reusing previous ForSt db directory at {}.", dbPath);
+ allNewlyCreated = false;
+ } else if (!fileSystem.mkdirs(dbPath)) {
throw new IOException(
String.format("Could not create ForSt db directory at
%s.", dbPath));
}
+ return allNewlyCreated;
}
/**
@@ -436,7 +441,7 @@ public final class ForStResourceContainer implements
AutoCloseable {
}
public void forceClearRemoteDirectories() throws Exception {
- if (remoteBasePath != null) {
+ if (remoteBasePath != null && remotePathNewlyCreated) {
clearDirectories(remoteBasePath);
}
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
index 5df2cb42fda..fd9ae8ea111 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
@@ -336,7 +336,9 @@ public class ForStResourceContainerTest {
assertTrue(new File(remoteBasePath.getPath()).exists());
optionsContainer.forceClearRemoteDirectories();
- assertFalse(new File(remoteBasePath.getPath()).exists());
+
+ // Do not delete remote directory because it is not created by
ForStResourceContainer
+ assertTrue(new File(remoteBasePath.getPath()).exists());
}
}