This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 19b7a8649eeec93cf8017d76881fda2fe0c30b7b
Author: Zakelly <[email protected]>
AuthorDate: Sun Mar 9 14:57:26 2025 +0800

    [FLINK-37437][state/forst] Unguarded file system to enable stream sharing 
between threads.
---
 .../flink/state/forst/fs/ForStFlinkFileSystem.java | 15 +++++-
 .../state/forst/fs/ForStFlinkFileSystemTest.java   | 59 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index 4e75d8d6369..e69612ad127 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -158,7 +158,11 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
             return null;
         }
         return new FileBasedCache(
-                config, cacheLimitPolicy, cacheBase.getFileSystem(), 
cacheBase, metricGroup);
+                config,
+                cacheLimitPolicy,
+                getUnguardedFileSystem(cacheBase),
+                cacheBase,
+                metricGroup);
     }
 
     public FileSystem getDelegateFS() {
@@ -195,7 +199,10 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
         Path sourceRealPath = source.getFilePath();
 
         // Create the actual file output stream
-        FileSystem fileSystem = sourceRealPath.getFileSystem();
+        // Should use the one WITHOUT safety net protection. The reason is 
that the ForSt LOG file
+        // might be created by any thread but share among all the threads, so 
we cannot let the LOG
+        // file auto-closed by one thread's quit.
+        FileSystem fileSystem = getUnguardedFileSystem(sourceRealPath);
         FSDataOutputStream outputStream = fileSystem.create(sourceRealPath, 
overwriteMode);
         // Bundle the output stream with the mapping entry, to close the entry 
when the stream is
         // closed.
@@ -398,6 +405,10 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
         fileMappingManager.giveUpOwnership(path, stateHandle);
     }
 
+    private static FileSystem getUnguardedFileSystem(Path path) throws 
IOException {
+        return FileSystem.getUnguardedFileSystem(path.toUri());
+    }
+
     private @Nullable CachedDataOutputStream createCachedDataOutputStream(
             Path dbFilePath, Path srcRealPath, FSDataOutputStream 
outputStream) throws IOException {
         // do not create cache for local files
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
index 84c9de5ff0f..d2c4a83bbd9 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.ByteBufferReadable;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.core.fs.local.LocalDataInputStream;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.metrics.Counter;
@@ -57,6 +58,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.state.forst.ForStOptions.CACHE_LRU_ACCESS_BEFORE_PROMOTION;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -413,6 +418,60 @@ public class ForStFlinkFileSystemTest {
         assertThat(is.read()).isEqualTo(79);
     }
 
+    @Test
+    public void testWriteStreamSharable() throws Exception {
+        org.apache.flink.core.fs.Path remotePath =
+                new org.apache.flink.core.fs.Path(tempDir.toString() + 
"/remote");
+        org.apache.flink.core.fs.Path localPath =
+                new org.apache.flink.core.fs.Path(tempDir.toString() + 
"/local");
+        org.apache.flink.core.fs.Path cachePath =
+                new org.apache.flink.core.fs.Path(tempDir.toString() + 
"/tmp-cache");
+        BundledCacheLimitPolicy cacheLimitPolicy =
+                new BundledCacheLimitPolicy(
+                        new SpaceBasedCacheLimitPolicy(new 
File(cachePath.toString()), 0, 0),
+                        new SizeBasedCacheLimitPolicy(250, 250));
+        FileBasedCache cache =
+                new FileBasedCache(
+                        new Configuration(),
+                        cacheLimitPolicy,
+                        FileSystem.getLocalFileSystem(),
+                        cachePath,
+                        new UnregisteredMetricsGroup());
+        ForStFlinkFileSystem fileSystem =
+                new ForStFlinkFileSystem(
+                        new ByteBufferReadableLocalFileSystem(),
+                        remotePath.toString(),
+                        localPath.toString(),
+                        cache);
+
+        // A write stream is create by other thread and can used by another 
thread.
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        org.apache.flink.core.fs.Path sstRemotePath =
+                new org.apache.flink.core.fs.Path(remotePath, "1.sst");
+        AtomicReference<ByteBufferWritableFSDataOutputStream> writeStream = 
new AtomicReference<>();
+        executor.submit(
+                        () -> {
+                            FileSystemSafetyNet.initializeSafetyNetForThread();
+                            try {
+                                
writeStream.set(fileSystem.create(sstRemotePath));
+                                writeStream.get().write(1);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            } finally {
+                                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+                            }
+                        })
+                .get(100, TimeUnit.SECONDS);
+        executor.shutdown();
+        assertThat(writeStream.get()).isNotNull();
+        // won't throw exception here
+        writeStream.get().write(2);
+        writeStream.get().close();
+        ByteBufferReadableFSDataInputStream is = 
fileSystem.open(sstRemotePath);
+        assertThat(is.read()).isEqualTo(1);
+        assertThat(is.read()).isEqualTo(2);
+    }
+
     private static void assertFileStatusAndBlockLocations(
             FileSystem fileSystem, FileStatus fileStatus) throws IOException {
         BlockLocation[] blockLocations =

Reply via email to