This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 95bf63841dc [FLINK-37693][state] Avoid reading NOT_OWNED file by local 
file system when restoring (#26497)
95bf63841dc is described below

commit 95bf63841dcdcf58cee057dafa14d7debcf64e77
Author: yhx <[email protected]>
AuthorDate: Mon Apr 28 19:24:07 2025 +0800

    [FLINK-37693][state] Avoid reading NOT_OWNED file by local file system when 
restoring (#26497)
---
 .../flink/state/forst/fs/ForStFlinkFileSystem.java | 39 ++++++++++++++-----
 .../forst/fs/filemapping/FileOwnershipDecider.java |  4 ++
 .../state/forst/fs/ForStFlinkFileSystemTest.java   | 45 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 9 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index afbc5a71505..309f16b4452 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
 import org.apache.flink.state.forst.fs.filemapping.FSDataOutputStreamWithEntry;
 import 
org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource;
 import org.apache.flink.state.forst.fs.filemapping.FileMappingManager;
+import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
 import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
 import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
 import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource;
@@ -211,7 +212,11 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
 
         // Try to create file cache for SST files
         CachedDataOutputStream cachedDataOutputStream =
-                createCachedDataOutputStream(dbFilePath, sourceRealPath, 
outputStream);
+                createCachedDataOutputStream(
+                        dbFilePath,
+                        sourceRealPath,
+                        outputStream,
+                        createdMappingEntry.getFileOwnership());
 
         LOG.trace(
                 "Create file: dbFilePath: {}, sourceRealPath: {}, 
cachedDataOutputStream: {}",
@@ -233,7 +238,11 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
                 () -> {
                     FSDataInputStream inputStream = 
source.openInputStream(bufferSize);
                     CachedDataInputStream cachedDataInputStream =
-                            createCachedDataInputStream(dbFilePath, source, 
inputStream);
+                            createCachedDataInputStream(
+                                    dbFilePath,
+                                    source,
+                                    inputStream,
+                                    mappingEntry.getFileOwnership());
                     return cachedDataInputStream == null ? inputStream : 
cachedDataInputStream;
                 },
                 DEFAULT_INPUT_STREAM_CAPACITY,
@@ -251,7 +260,11 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
                 () -> {
                     FSDataInputStream inputStream = source.openInputStream();
                     CachedDataInputStream cachedDataInputStream =
-                            createCachedDataInputStream(dbFilePath, source, 
inputStream);
+                            createCachedDataInputStream(
+                                    dbFilePath,
+                                    source,
+                                    inputStream,
+                                    mappingEntry.getFileOwnership());
                     return cachedDataInputStream == null ? inputStream : 
cachedDataInputStream;
                 },
                 DEFAULT_INPUT_STREAM_CAPACITY,
@@ -285,7 +298,7 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
             return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir();
         }
 
-        if (FileOwnershipDecider.shouldAlwaysBeLocal(f)) {
+        if (FileOwnershipDecider.shouldAlwaysBeLocal(f, 
mappingEntry.getFileOwnership())) {
             return localFS.exists(mappingEntry.getSourcePath());
         } else {
             // Should be protected with synchronized, since the file closing 
is not an atomic
@@ -305,7 +318,7 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
         if (mappingEntry == null) {
             return new FileStatusWrapper(delegateFS.getFileStatus(path), path);
         }
-        if (FileOwnershipDecider.shouldAlwaysBeLocal(path)) {
+        if (FileOwnershipDecider.shouldAlwaysBeLocal(path, 
mappingEntry.getFileOwnership())) {
             return new 
FileStatusWrapper(localFS.getFileStatus(mappingEntry.getSourcePath()), path);
         } else {
             // Should be protected with synchronized, since the file closing 
is not an atomic
@@ -401,9 +414,13 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
     }
 
     private @Nullable CachedDataOutputStream createCachedDataOutputStream(
-            Path dbFilePath, Path srcRealPath, FSDataOutputStream 
outputStream) throws IOException {
+            Path dbFilePath,
+            Path srcRealPath,
+            FSDataOutputStream outputStream,
+            FileOwnership fileOwnership)
+            throws IOException {
         // do not create cache for local files
-        if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath)) {
+        if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, 
fileOwnership)) {
             return null;
         }
 
@@ -411,9 +428,13 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
     }
 
     private @Nullable CachedDataInputStream createCachedDataInputStream(
-            Path dbFilePath, MappingEntrySource source, FSDataInputStream 
inputStream)
+            Path dbFilePath,
+            MappingEntrySource source,
+            FSDataInputStream inputStream,
+            FileOwnership fileOwnership)
             throws IOException {
-        if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath) || 
!source.cacheable()) {
+        if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership)
+                || !source.cacheable()) {
             return null;
         }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
index 909b4476270..119ec9b44d4 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
@@ -43,6 +43,10 @@ public class FileOwnershipDecider {
         return filePath.getName().endsWith(SST_SUFFIX);
     }
 
+    public static boolean shouldAlwaysBeLocal(Path filePath, FileOwnership 
fileOwnership) {
+        return !isSstFile(filePath) && fileOwnership != 
FileOwnership.NOT_OWNED;
+    }
+
     public static boolean shouldAlwaysBeLocal(Path filePath) {
         return !isSstFile(filePath);
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
index 50c3a1ed0ba..b6f61afdb66 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
@@ -30,11 +30,13 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.state.forst.fs.cache.BundledCacheLimitPolicy;
 import org.apache.flink.state.forst.fs.cache.FileBasedCache;
 import org.apache.flink.state.forst.fs.cache.FileCacheEntry;
 import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy;
 import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
+import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
@@ -426,6 +428,49 @@ public class ForStFlinkFileSystemTest {
         assertFileStatusAndBlockLocations(fileSystem, 
fileSystem.getFileStatus(sstRemotePath1));
     }
 
+    @Test
+    public void testNotOwnedFileStatus() throws IOException {
+        org.apache.flink.core.fs.Path remotePath =
+                new org.apache.flink.core.fs.Path(tempDir.toString() + 
"/remote");
+        org.apache.flink.core.fs.Path localPath =
+                new org.apache.flink.core.fs.Path(tempDir.toString() + 
"/local");
+        ForStFlinkFileSystem fileSystem =
+                new ForStFlinkFileSystem(
+                        // Return dummy file status which differs from real 
local file system
+                        new LocalFileSystem() {
+                            @Override
+                            public FileStatus 
getFileStatus(org.apache.flink.core.fs.Path path) {
+                                return new 
ForStFlinkFileSystem.DummyFSFileStatus(path);
+                            }
+                        },
+                        remotePath.toString(),
+                        localPath.toString(),
+                        null);
+        fileSystem.mkdirs(remotePath);
+        fileSystem.mkdirs(localPath);
+        org.apache.flink.core.fs.Path sstRemotePath1 =
+                new org.apache.flink.core.fs.Path(remotePath, "1.sst");
+        ByteBufferWritableFSDataOutputStream os1 = 
fileSystem.create(sstRemotePath1);
+        os1.write(1);
+        os1.close();
+
+        // Mock restore procedure, getFileStatus should not use local file 
system to access when the
+        // ownership is given to Flink
+        MappingEntry mappingEntry = fileSystem.getMappingEntry(sstRemotePath1);
+        assertThat(mappingEntry).isNotNull();
+        assertThat(mappingEntry.getSourcePath()).isNotNull();
+        FileStateHandle remoteFileStateHandle =
+                new FileStateHandle(mappingEntry.getSourcePath(), 1L);
+        fileSystem.registerReusedRestoredFile(
+                mappingEntry.getSourcePath().toString(), 
remoteFileStateHandle, sstRemotePath1);
+        fileSystem.giveUpOwnership(sstRemotePath1, remoteFileStateHandle);
+        
assertThat(fileSystem.getFileStatus(mappingEntry.getSourcePath()).getLen())
+                .isNotEqualTo(
+                        FileSystem.getLocalFileSystem()
+                                .getFileStatus(mappingEntry.getSourcePath())
+                                .getLen());
+    }
+
     @Test
     public void testOverride() throws IOException {
         org.apache.flink.core.fs.Path remotePath =

Reply via email to