This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a79f2e2f1b4 [FLINK-37031][state/forst] Bump forstjni to 0.1.5 && make 
ForStFlinkFileSystem thread safe" (#25927)
a79f2e2f1b4 is described below

commit a79f2e2f1b434d050cb1b2d75a909b6a313d9e07
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Thu Jan 9 22:44:04 2025 +0800

    [FLINK-37031][state/forst] Bump forstjni to 0.1.5 && make 
ForStFlinkFileSystem thread safe" (#25927)
---
 flink-dist/src/main/resources/META-INF/NOTICE      |  2 +-
 .../flink-statebackend-forst/pom.xml               |  2 +-
 .../flink/state/forst/ForStResourceContainer.java  | 24 +++++-----------
 .../flink/state/forst/ForStStateDataTransfer.java  |  7 +++--
 .../flink/state/forst/fs/ForStFlinkFileSystem.java | 33 +++++++++++-----------
 .../forst/fs/filemapping/FileMappingManager.java   | 15 +++-------
 .../restore/ForStIncrementalRestoreOperation.java  |  5 ++--
 .../state/forst/ForStStateBackendConfigTest.java   |  3 +-
 .../state/forst/fs/FileMappingManagerTest.java     | 15 ++++++++--
 9 files changed, 52 insertions(+), 54 deletions(-)

diff --git a/flink-dist/src/main/resources/META-INF/NOTICE 
b/flink-dist/src/main/resources/META-INF/NOTICE
index 4007febfb53..2d28859adaa 100644
--- a/flink-dist/src/main/resources/META-INF/NOTICE
+++ b/flink-dist/src/main/resources/META-INF/NOTICE
@@ -9,7 +9,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.google.code.findbugs:jsr305:1.3.9
 - com.twitter:chill-java:0.7.6
 - com.ververica:frocksdbjni:8.10.0-ververica-beta-1.0
-- com.ververica:forstjni:0.1.4-beta
+- com.ververica:forstjni:0.1.5
 - commons-cli:commons-cli:1.5.0
 - commons-collections:commons-collections:3.2.2
 - commons-io:commons-io:2.15.1
diff --git a/flink-state-backends/flink-statebackend-forst/pom.xml 
b/flink-state-backends/flink-statebackend-forst/pom.xml
index f6073c889e4..d3e5740fdef 100644
--- a/flink-state-backends/flink-statebackend-forst/pom.xml
+++ b/flink-state-backends/flink-statebackend-forst/pom.xml
@@ -63,7 +63,7 @@ under the License.
                <dependency>
                        <groupId>com.ververica</groupId>
                        <artifactId>forstjni</artifactId>
-                       <version>0.1.4-beta</version>
+                       <version>0.1.5</version>
                </dependency>
 
                <!-- test dependencies -->
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index 08ee1a7f7fa..3d025d5ebdd 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -78,8 +78,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
     // the filename length limit is 255 on most operating systems
     // In rocksdb, if db_log_dir is non empty, the log files will be in the 
specified dir,
     // and the db data dir's absolute path will be used as the log file name's 
prefix.
-    private static final int INSTANCE_PATH_LENGTH_LIMIT =
-            255 / 2 - FORST_RELOCATE_LOG_SUFFIX.length();
+    private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_SUFFIX.length();
 
     @Nullable private FlinkEnv flinkEnv = null;
 
@@ -396,9 +395,8 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         }
     }
 
-    private void clearDirectories(Path basePath) throws IOException {
-        FileSystem fileSystem =
-                forStFileSystem != null ? forStFileSystem : 
basePath.getFileSystem();
+    private static void clearDirectories(Path basePath) throws IOException {
+        FileSystem fileSystem = basePath.getFileSystem();
         if (fileSystem.exists(basePath)) {
             fileSystem.delete(basePath, true);
         }
@@ -485,19 +483,11 @@ public final class ForStResourceContainer implements 
AutoCloseable {
 
         String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR);
         if (logDir == null || logDir.isEmpty()) {
-            if (localForStPath == null
-                    || localForStPath.getPath().length() <= 
INSTANCE_PATH_LENGTH_LIMIT) {
+            // only relocate db log dir in local mode
+            if (remoteForStPath == null
+                    && localForStPath != null
+                    && localForStPath.getPath().length() <= 
INSTANCE_PATH_LENGTH_LIMIT) {
                 relocateDefaultDbLogDir(currentOptions);
-            } else if (remoteForStPath != null) { // log must put in local
-                Path relocatedPath = localForStPath.getParent().getParent();
-                LOG.warn("ForSt remote path is not null, relocate log in  
{}.", relocatedPath);
-                currentOptions.setDbLogDir(relocatedPath.toString());
-            } else {
-                // disable log relocate when instance path length exceeds 
limit to prevent ForSt
-                // log file creation failure, details in FLINK-31743
-                LOG.warn(
-                        "ForSt local path length exceeds limit : {}, disable 
log relocate.",
-                        localForStPath);
             }
         } else {
             currentOptions.setDbLogDir(logDir);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java
index a0877f5bb80..5a26776bc63 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateDataTransfer.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.CheckpointedStateScope;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
@@ -38,6 +39,8 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -66,13 +69,13 @@ public class ForStStateDataTransfer implements Closeable {
 
     protected final ExecutorService executorService;
 
-    private final FileSystem forStFs;
+    @Nullable private final ForStFlinkFileSystem forStFs;
 
     public ForStStateDataTransfer(int threadNum) {
         this(threadNum, null);
     }
 
-    public ForStStateDataTransfer(int threadNum, FileSystem forStFs) {
+    public ForStStateDataTransfer(int threadNum, ForStFlinkFileSystem forStFs) 
{
         this.forStFs = forStFs;
         if (threadNum > 1) {
             executorService =
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index 433f5f15c27..f7c0ec0fc3e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -136,8 +136,8 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode 
overwriteMode)
-            throws IOException {
+    public synchronized ByteBufferWritableFSDataOutputStream create(
+            Path path, WriteMode overwriteMode) throws IOException {
         FileMappingManager.RealPath realPath = 
fileMappingManager.createFile(path);
         if (realPath.isLocal) {
             return new ByteBufferWritableFSDataOutputStream(
@@ -152,7 +152,8 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) 
throws IOException {
+    public synchronized ByteBufferReadableFSDataInputStream open(Path path, 
int bufferSize)
+            throws IOException {
         FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
         Preconditions.checkNotNull(realPath);
         if (realPath.isLocal) {
@@ -176,7 +177,7 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public ByteBufferReadableFSDataInputStream open(Path path) throws 
IOException {
+    public synchronized ByteBufferReadableFSDataInputStream open(Path path) 
throws IOException {
         FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
         Preconditions.checkNotNull(realPath);
         if (realPath.isLocal) {
@@ -200,27 +201,27 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public boolean rename(Path src, Path dst) throws IOException {
+    public synchronized boolean rename(Path src, Path dst) throws IOException {
         return fileMappingManager.renameFile(src.toString(), dst.toString());
     }
 
     @Override
-    public Path getWorkingDirectory() {
+    public synchronized Path getWorkingDirectory() {
         return delegateFS.getWorkingDirectory();
     }
 
     @Override
-    public Path getHomeDirectory() {
+    public synchronized Path getHomeDirectory() {
         return delegateFS.getHomeDirectory();
     }
 
     @Override
-    public URI getUri() {
+    public synchronized URI getUri() {
         return delegateFS.getUri();
     }
 
     @Override
-    public boolean exists(final Path f) throws IOException {
+    public synchronized boolean exists(final Path f) throws IOException {
         FileMappingManager.RealPath realPath = fileMappingManager.realPath(f);
         if (realPath == null) {
             return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir();
@@ -239,7 +240,7 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public FileStatus getFileStatus(Path path) throws IOException {
+    public synchronized FileStatus getFileStatus(Path path) throws IOException 
{
         FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
         Preconditions.checkNotNull(realPath);
         if (realPath.isLocal) {
@@ -249,7 +250,7 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len)
+    public synchronized BlockLocation[] getFileBlockLocations(FileStatus file, 
long start, long len)
             throws IOException {
         Path path = file.getPath();
         FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
@@ -262,7 +263,7 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public FileStatus[] listStatus(Path path) throws IOException {
+    public synchronized FileStatus[] listStatus(Path path) throws IOException {
         // mapping files
         List<FileStatus> fileStatuses = new ArrayList<>();
         String pathStr = path.toString();
@@ -281,7 +282,7 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public boolean delete(Path path, boolean recursive) throws IOException {
+    public synchronized boolean delete(Path path, boolean recursive) throws 
IOException {
         boolean success = fileMappingManager.deleteFile(path, recursive);
         if (fileBasedCache != null) {
             // only new generated file will put into cache, no need to 
consider file mapping
@@ -291,16 +292,16 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     @Override
-    public boolean mkdirs(Path path) throws IOException {
+    public synchronized boolean mkdirs(Path path) throws IOException {
         return delegateFS.mkdirs(path);
     }
 
     @Override
-    public boolean isDistributedFS() {
+    public synchronized boolean isDistributedFS() {
         return delegateFS.isDistributedFS();
     }
 
-    public int link(Path src, Path dst) throws IOException {
+    public synchronized int link(Path src, Path dst) throws IOException {
         return fileMappingManager.link(src.toString(), dst.toString());
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
index d935a5e4d08..b54e4e0844d 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
@@ -70,7 +70,7 @@ public class FileMappingManager {
     public RealPath createFile(Path file) {
         String fileName = file.toString();
         Preconditions.checkState(!mappingTable.containsKey(fileName));
-        if (!fileName.endsWith(SST_SUFFIX) && fileName.startsWith(remoteBase)) 
{
+        if (!fileName.endsWith(SST_SUFFIX) && isParentDir(fileName, 
remoteBase)) {
             Path localFile = new Path(localBase, file.getName());
             mappingTable.put(
                     fileName,
@@ -92,16 +92,9 @@ public class FileMappingManager {
             return -1;
         }
         MappingEntry sourceEntry = mappingTable.get(src);
-        if (sourceEntry != null) {
-            sourceEntry.retain();
-            mappingTable.putIfAbsent(dst, sourceEntry);
-        } else {
-            sourceEntry = new MappingEntry(0, fileSystem, src, false, false);
-            sourceEntry.retain();
-            mappingTable.put(src, sourceEntry);
-            sourceEntry.retain();
-            mappingTable.put(dst, sourceEntry);
-        }
+        Preconditions.checkNotNull(sourceEntry);
+        sourceEntry.retain();
+        mappingTable.putIfAbsent(dst, sourceEntry);
         LOG.trace("link: {} -> {}", dst, src);
         return 0;
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
index fc9a1051df1..ab8d76541da 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
@@ -253,9 +253,10 @@ public class ForStIncrementalRestoreOperation<K> 
implements ForStRestoreOperatio
     }
 
     private void transferAllStateHandles(List<StateHandleTransferSpec> specs) 
throws Exception {
-        FileSystem forStFs = getFileSystem(optionsContainer.getBasePath());
         try (ForStStateDataTransfer transfer =
-                new 
ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs)) {
+                new ForStStateDataTransfer(
+                        ForStStateDataTransfer.DEFAULT_THREAD_NUM,
+                        optionsContainer.getFileSystem())) {
             transfer.transferAllStateDataToDirectory(specs, 
cancelStreamRegistry);
         }
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
index b7b357f2ba7..f106fef31fe 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
@@ -93,7 +93,8 @@ public class ForStStateBackendConfigTest {
         final File logFile = File.createTempFile(getClass().getSimpleName() + 
"-", ".log");
         // set the environment variable 'log.file' with the Flink log file 
location
         System.setProperty("log.file", logFile.getPath());
-        try (ForStResourceContainer container = 
backend.createOptionsAndResourceContainer(null)) {
+        try (ForStResourceContainer container =
+                backend.createOptionsAndResourceContainer(new 
Path(tempFolder.toString()))) {
             assertEquals(
                     ForStConfigurableOptions.LOG_LEVEL.defaultValue(),
                     container.getDbOptions().infoLogLevel());
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
index 081e12d690c..77b24a0861b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
@@ -43,6 +43,7 @@ public class FileMappingManagerTest {
         FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
         os.write(233);
         os.close();
+        fileMappingManager.createFile(new Path(src));
         String dst = tempDir.toString() + "/dst";
         fileMappingManager.link(src, dst);
         assertThat(fileMappingManager.realPath(new 
Path(dst)).path.toString()).isEqualTo(src);
@@ -60,6 +61,7 @@ public class FileMappingManagerTest {
         FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
         os.write(233);
         os.close();
+        fileMappingManager.createFile(new Path(src));
         String dstB = tempDir.toString() + "/b";
         fileMappingManager.link(src, dstB);
         assertThat(fileMappingManager.realPath(new 
Path(dstB)).path.toString()).isEqualTo(src);
@@ -87,6 +89,7 @@ public class FileMappingManagerTest {
         FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
         os.write(233);
         os.close();
+        fileMappingManager.createFile(new Path(src));
         String dst = tempDir.toString() + "/dst";
         fileMappingManager.link(src, dst);
         // delete src
@@ -102,13 +105,15 @@ public class FileMappingManagerTest {
     void testDirectoryDelete() throws IOException {
         FileSystem localFS = FileSystem.getLocalFileSystem();
         FileMappingManager fileMappingManager =
-                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+                new FileMappingManager(
+                        localFS, localFS, tempDir.toString() + "/db", 
tempDir.toString() + "/db");
         String testDir = tempDir + "/testDir";
         localFS.mkdirs(new Path(testDir));
         String src = testDir + "/source";
         FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
         os.write(233);
         os.close();
+        fileMappingManager.createFile(new Path(src));
         String dst = tempDir.toString() + "/dst";
         fileMappingManager.link(src, dst);
 
@@ -127,13 +132,15 @@ public class FileMappingManagerTest {
     void testDirectoryRename() throws IOException {
         FileSystem localFS = FileSystem.getLocalFileSystem();
         FileMappingManager fileMappingManager =
-                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+                new FileMappingManager(
+                        localFS, localFS, tempDir.toString() + "/db", 
tempDir.toString() + "/db");
         String testDir = tempDir + "/testDir";
         localFS.mkdirs(new Path(testDir));
         String src = testDir + "/source";
         FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
         os.write(233);
         os.close();
+        fileMappingManager.createFile(new Path(src));
 
         String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp";
         localFS.mkdirs(new Path(linkedDirTmp));
@@ -175,13 +182,15 @@ public class FileMappingManagerTest {
     void testCreateFileBeforeRename() throws IOException {
         FileSystem localFS = FileSystem.getLocalFileSystem();
         FileMappingManager fileMappingManager =
-                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+                new FileMappingManager(
+                        localFS, localFS, tempDir.toString() + "/db", 
tempDir.toString() + "/db");
         String testDir = tempDir + "/testDir";
         localFS.mkdirs(new Path(testDir));
         String src = testDir + "/source";
         FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
         os.write(233);
         os.close();
+        fileMappingManager.createFile(new Path(src));
 
         String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp";
         localFS.mkdirs(new Path(linkedDirTmp));

Reply via email to