This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 19b7a8649eeec93cf8017d76881fda2fe0c30b7b Author: Zakelly <[email protected]> AuthorDate: Sun Mar 9 14:57:26 2025 +0800 [FLINK-37437][state/forst] Unguarded file system to enable stream sharing between threads. --- .../flink/state/forst/fs/ForStFlinkFileSystem.java | 15 +++++- .../state/forst/fs/ForStFlinkFileSystemTest.java | 59 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index 4e75d8d6369..e69612ad127 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -158,7 +158,11 @@ public class ForStFlinkFileSystem extends FileSystem implements Closeable { return null; } return new FileBasedCache( - config, cacheLimitPolicy, cacheBase.getFileSystem(), cacheBase, metricGroup); + config, + cacheLimitPolicy, + getUnguardedFileSystem(cacheBase), + cacheBase, + metricGroup); } public FileSystem getDelegateFS() { @@ -195,7 +199,10 @@ public class ForStFlinkFileSystem extends FileSystem implements Closeable { Path sourceRealPath = source.getFilePath(); // Create the actual file output stream - FileSystem fileSystem = sourceRealPath.getFileSystem(); + // Should use the one WITHOUT safety net protection. The reason is that the ForSt LOG file + // might be created by any thread but share among all the threads, so we cannot let the LOG + // file auto-closed by one thread's quit. + FileSystem fileSystem = getUnguardedFileSystem(sourceRealPath); FSDataOutputStream outputStream = fileSystem.create(sourceRealPath, overwriteMode); // Bundle the output stream with the mapping entry, to close the entry when the stream is // closed. @@ -398,6 +405,10 @@ public class ForStFlinkFileSystem extends FileSystem implements Closeable { fileMappingManager.giveUpOwnership(path, stateHandle); } + private static FileSystem getUnguardedFileSystem(Path path) throws IOException { + return FileSystem.getUnguardedFileSystem(path.toUri()); + } + private @Nullable CachedDataOutputStream createCachedDataOutputStream( Path dbFilePath, Path srcRealPath, FSDataOutputStream outputStream) throws IOException { // do not create cache for local files diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java index 84c9de5ff0f..d2c4a83bbd9 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.ByteBufferReadable; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemSafetyNet; import org.apache.flink.core.fs.local.LocalDataInputStream; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.metrics.Counter; @@ -57,6 +58,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.state.forst.ForStOptions.CACHE_LRU_ACCESS_BEFORE_PROMOTION; import static org.assertj.core.api.Assertions.assertThat; @@ -413,6 +418,60 @@ public class ForStFlinkFileSystemTest { assertThat(is.read()).isEqualTo(79); } + @Test + public void testWriteStreamSharable() throws Exception { + org.apache.flink.core.fs.Path remotePath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/remote"); + org.apache.flink.core.fs.Path localPath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/local"); + org.apache.flink.core.fs.Path cachePath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/tmp-cache"); + BundledCacheLimitPolicy cacheLimitPolicy = + new BundledCacheLimitPolicy( + new SpaceBasedCacheLimitPolicy(new File(cachePath.toString()), 0, 0), + new SizeBasedCacheLimitPolicy(250, 250)); + FileBasedCache cache = + new FileBasedCache( + new Configuration(), + cacheLimitPolicy, + FileSystem.getLocalFileSystem(), + cachePath, + new UnregisteredMetricsGroup()); + ForStFlinkFileSystem fileSystem = + new ForStFlinkFileSystem( + new ByteBufferReadableLocalFileSystem(), + remotePath.toString(), + localPath.toString(), + cache); + + // A write stream is create by other thread and can used by another thread. + ExecutorService executor = Executors.newFixedThreadPool(1); + org.apache.flink.core.fs.Path sstRemotePath = + new org.apache.flink.core.fs.Path(remotePath, "1.sst"); + AtomicReference<ByteBufferWritableFSDataOutputStream> writeStream = new AtomicReference<>(); + executor.submit( + () -> { + FileSystemSafetyNet.initializeSafetyNetForThread(); + try { + writeStream.set(fileSystem.create(sstRemotePath)); + writeStream.get().write(1); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); + } + }) + .get(100, TimeUnit.SECONDS); + executor.shutdown(); + assertThat(writeStream.get()).isNotNull(); + // won't throw exception here + writeStream.get().write(2); + writeStream.get().close(); + ByteBufferReadableFSDataInputStream is = fileSystem.open(sstRemotePath); + assertThat(is.read()).isEqualTo(1); + assertThat(is.read()).isEqualTo(2); + } + private static void assertFileStatusAndBlockLocations( FileSystem fileSystem, FileStatus fileStatus) throws IOException { BlockLocation[] blockLocations =
