This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 95bf63841dc [FLINK-37693][state] Avoid reading NOT_OWNED file by local
file system when restoring (#26497)
95bf63841dc is described below
commit 95bf63841dcdcf58cee057dafa14d7debcf64e77
Author: yhx <[email protected]>
AuthorDate: Mon Apr 28 19:24:07 2025 +0800
[FLINK-37693][state] Avoid reading NOT_OWNED file by local file system when
restoring (#26497)
---
.../flink/state/forst/fs/ForStFlinkFileSystem.java | 39 ++++++++++++++-----
.../forst/fs/filemapping/FileOwnershipDecider.java | 4 ++
.../state/forst/fs/ForStFlinkFileSystemTest.java | 45 ++++++++++++++++++++++
3 files changed, 79 insertions(+), 9 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index afbc5a71505..309f16b4452 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -38,6 +38,7 @@ import
org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.filemapping.FSDataOutputStreamWithEntry;
import
org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource;
import org.apache.flink.state.forst.fs.filemapping.FileMappingManager;
+import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource;
@@ -211,7 +212,11 @@ public class ForStFlinkFileSystem extends FileSystem
implements Closeable {
// Try to create file cache for SST files
CachedDataOutputStream cachedDataOutputStream =
- createCachedDataOutputStream(dbFilePath, sourceRealPath,
outputStream);
+ createCachedDataOutputStream(
+ dbFilePath,
+ sourceRealPath,
+ outputStream,
+ createdMappingEntry.getFileOwnership());
LOG.trace(
"Create file: dbFilePath: {}, sourceRealPath: {},
cachedDataOutputStream: {}",
@@ -233,7 +238,11 @@ public class ForStFlinkFileSystem extends FileSystem
implements Closeable {
() -> {
FSDataInputStream inputStream =
source.openInputStream(bufferSize);
CachedDataInputStream cachedDataInputStream =
- createCachedDataInputStream(dbFilePath, source,
inputStream);
+ createCachedDataInputStream(
+ dbFilePath,
+ source,
+ inputStream,
+ mappingEntry.getFileOwnership());
return cachedDataInputStream == null ? inputStream :
cachedDataInputStream;
},
DEFAULT_INPUT_STREAM_CAPACITY,
@@ -251,7 +260,11 @@ public class ForStFlinkFileSystem extends FileSystem
implements Closeable {
() -> {
FSDataInputStream inputStream = source.openInputStream();
CachedDataInputStream cachedDataInputStream =
- createCachedDataInputStream(dbFilePath, source,
inputStream);
+ createCachedDataInputStream(
+ dbFilePath,
+ source,
+ inputStream,
+ mappingEntry.getFileOwnership());
return cachedDataInputStream == null ? inputStream :
cachedDataInputStream;
},
DEFAULT_INPUT_STREAM_CAPACITY,
@@ -285,7 +298,7 @@ public class ForStFlinkFileSystem extends FileSystem
implements Closeable {
return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir();
}
- if (FileOwnershipDecider.shouldAlwaysBeLocal(f)) {
+ if (FileOwnershipDecider.shouldAlwaysBeLocal(f,
mappingEntry.getFileOwnership())) {
return localFS.exists(mappingEntry.getSourcePath());
} else {
// Should be protected with synchronized, since the file closing
is not an atomic
@@ -305,7 +318,7 @@ public class ForStFlinkFileSystem extends FileSystem
implements Closeable {
if (mappingEntry == null) {
return new FileStatusWrapper(delegateFS.getFileStatus(path), path);
}
- if (FileOwnershipDecider.shouldAlwaysBeLocal(path)) {
+ if (FileOwnershipDecider.shouldAlwaysBeLocal(path,
mappingEntry.getFileOwnership())) {
return new
FileStatusWrapper(localFS.getFileStatus(mappingEntry.getSourcePath()), path);
} else {
// Should be protected with synchronized, since the file closing
is not an atomic
@@ -401,9 +414,13 @@ public class ForStFlinkFileSystem extends FileSystem
implements Closeable {
}
private @Nullable CachedDataOutputStream createCachedDataOutputStream(
- Path dbFilePath, Path srcRealPath, FSDataOutputStream
outputStream) throws IOException {
+ Path dbFilePath,
+ Path srcRealPath,
+ FSDataOutputStream outputStream,
+ FileOwnership fileOwnership)
+ throws IOException {
// do not create cache for local files
- if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath)) {
+ if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath,
fileOwnership)) {
return null;
}
@@ -411,9 +428,13 @@ public class ForStFlinkFileSystem extends FileSystem
implements Closeable {
}
private @Nullable CachedDataInputStream createCachedDataInputStream(
- Path dbFilePath, MappingEntrySource source, FSDataInputStream
inputStream)
+ Path dbFilePath,
+ MappingEntrySource source,
+ FSDataInputStream inputStream,
+ FileOwnership fileOwnership)
throws IOException {
- if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath) ||
!source.cacheable()) {
+ if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership)
+ || !source.cacheable()) {
return null;
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
index 909b4476270..119ec9b44d4 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java
@@ -43,6 +43,10 @@ public class FileOwnershipDecider {
return filePath.getName().endsWith(SST_SUFFIX);
}
+ public static boolean shouldAlwaysBeLocal(Path filePath, FileOwnership
fileOwnership) {
+ return !isSstFile(filePath) && fileOwnership !=
FileOwnership.NOT_OWNED;
+ }
+
public static boolean shouldAlwaysBeLocal(Path filePath) {
return !isSstFile(filePath);
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
index 50c3a1ed0ba..b6f61afdb66 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
@@ -30,11 +30,13 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.state.forst.fs.cache.BundledCacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
import org.apache.flink.state.forst.fs.cache.FileCacheEntry;
import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
+import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
@@ -426,6 +428,49 @@ public class ForStFlinkFileSystemTest {
assertFileStatusAndBlockLocations(fileSystem,
fileSystem.getFileStatus(sstRemotePath1));
}
+ @Test
+ public void testNotOwnedFileStatus() throws IOException {
+ org.apache.flink.core.fs.Path remotePath =
+ new org.apache.flink.core.fs.Path(tempDir.toString() +
"/remote");
+ org.apache.flink.core.fs.Path localPath =
+ new org.apache.flink.core.fs.Path(tempDir.toString() +
"/local");
+ ForStFlinkFileSystem fileSystem =
+ new ForStFlinkFileSystem(
+ // Return dummy file status which differs from real
local file system
+ new LocalFileSystem() {
+ @Override
+ public FileStatus
getFileStatus(org.apache.flink.core.fs.Path path) {
+ return new
ForStFlinkFileSystem.DummyFSFileStatus(path);
+ }
+ },
+ remotePath.toString(),
+ localPath.toString(),
+ null);
+ fileSystem.mkdirs(remotePath);
+ fileSystem.mkdirs(localPath);
+ org.apache.flink.core.fs.Path sstRemotePath1 =
+ new org.apache.flink.core.fs.Path(remotePath, "1.sst");
+ ByteBufferWritableFSDataOutputStream os1 =
fileSystem.create(sstRemotePath1);
+ os1.write(1);
+ os1.close();
+
+ // Mock restore procedure, getFileStatus should not use local file
system to access when the
+ // ownership is given to Flink
+ MappingEntry mappingEntry = fileSystem.getMappingEntry(sstRemotePath1);
+ assertThat(mappingEntry).isNotNull();
+ assertThat(mappingEntry.getSourcePath()).isNotNull();
+ FileStateHandle remoteFileStateHandle =
+ new FileStateHandle(mappingEntry.getSourcePath(), 1L);
+ fileSystem.registerReusedRestoredFile(
+ mappingEntry.getSourcePath().toString(),
remoteFileStateHandle, sstRemotePath1);
+ fileSystem.giveUpOwnership(sstRemotePath1, remoteFileStateHandle);
+
assertThat(fileSystem.getFileStatus(mappingEntry.getSourcePath()).getLen())
+ .isNotEqualTo(
+ FileSystem.getLocalFileSystem()
+ .getFileStatus(mappingEntry.getSourcePath())
+ .getLen());
+ }
+
@Test
public void testOverride() throws IOException {
org.apache.flink.core.fs.Path remotePath =