This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 62694fae763b9421dcb641f664cfc8a26ad71eb9 Author: Zakelly <[email protected]> AuthorDate: Mon Mar 10 03:23:04 2025 +0800 [FLINK-37437][state/forst] Bundle file cache entry with mapping entry and release properly --- .../flink/state/forst/fs/ForStFlinkFileSystem.java | 19 +++++----------- .../forst/fs/filemapping/FileMappingManager.java | 23 ++++++++++++++------ .../state/forst/fs/filemapping/MappingEntry.java | 19 ++++++++++++++++ .../state/forst/fs/FileMappingManagerTest.java | 4 ++-- .../state/forst/fs/ForStFlinkFileSystemTest.java | 25 ++++++++++++++++++++++ 5 files changed, 67 insertions(+), 23 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index e69612ad127..afbc5a71505 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -191,7 +191,8 @@ public class ForStFlinkFileSystem extends FileSystem implements Closeable { Path dbFilePath, WriteMode overwriteMode) throws IOException { // Create a file in the mapping table MappingEntry createdMappingEntry = - fileMappingManager.createNewFile(dbFilePath, overwriteMode == WriteMode.OVERWRITE); + fileMappingManager.createNewFile( + dbFilePath, overwriteMode == WriteMode.OVERWRITE, fileBasedCache); // The source must be backed by a file FileBackedMappingEntrySource source = @@ -359,14 +360,7 @@ public class ForStFlinkFileSystem extends FileSystem implements Closeable { @Override public synchronized boolean delete(Path path, boolean recursive) throws IOException { - MappingEntry mappingEntry = fileMappingManager.mappingEntry(path.toString()); - boolean success = fileMappingManager.deleteFileOrDirectory(path, recursive); - if (fileBasedCache != null && mappingEntry != null) { - // if mappingEntry is not null, it means it is a file, not directory - MappingEntrySource source = mappingEntry.getSource(); - fileBasedCache.delete(source.getFilePath()); - } - return success; + return fileMappingManager.deleteFileOrDirectory(path, recursive); } @Override @@ -390,11 +384,8 @@ public class ForStFlinkFileSystem extends FileSystem implements Closeable { public synchronized void registerReusedRestoredFile( String key, StreamStateHandle stateHandle, Path dbFilePath) { MappingEntry mappingEntry = - fileMappingManager.registerReusedRestoredFile(key, stateHandle, dbFilePath); - if (fileBasedCache != null) { - fileBasedCache.registerInCache( - mappingEntry.getSourcePath(), stateHandle.getStateSize()); - } + fileMappingManager.registerReusedRestoredFile( + key, stateHandle, dbFilePath, fileBasedCache); } public synchronized @Nullable MappingEntry getMappingEntry(Path path) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java index a9709b06b1b..ca2ff8dbec4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.state.forst.fs.cache.FileBasedCache; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -61,9 +62,10 @@ public class FileMappingManager { } /** Create a new file in the mapping table. */ - public MappingEntry createNewFile(Path filePath, boolean overwrite) { + public MappingEntry createNewFile(Path filePath, boolean overwrite, FileBasedCache cache) { String key = filePath.toString(); - if (FileOwnershipDecider.shouldAlwaysBeLocal(filePath)) { + boolean isLocal = FileOwnershipDecider.shouldAlwaysBeLocal(filePath); + if (isLocal) { filePath = forceLocalPath(filePath); } @@ -71,23 +73,27 @@ public class FileMappingManager { key, toUUIDPath(filePath), FileOwnershipDecider.decideForNewFile(filePath), + isLocal ? null : cache, true, overwrite); } /** Register a file restored from checkpoints to the mapping table. */ public MappingEntry registerReusedRestoredFile( - String key, StreamStateHandle stateHandle, Path dbFilePath) { + String key, StreamStateHandle stateHandle, Path dbFilePath, FileBasedCache cache) { // The checkpoint file may contain only the UUID without the file extension, so we: // - Decide file ownership based on dbFilePath, so we can know the real file type. // - Add to mapping table based on cpFilePath, so we can access the real file. LOG.trace("decide restored file ownership based on dbFilePath: {}", dbFilePath); return addHandleBackedFileToMappingTable( - key, stateHandle, FileOwnershipDecider.decideForRestoredFile(dbFilePath)); + key, stateHandle, FileOwnershipDecider.decideForRestoredFile(dbFilePath), cache); } private MappingEntry addHandleBackedFileToMappingTable( - String key, StreamStateHandle stateHandle, FileOwnership fileOwnership) { + String key, + StreamStateHandle stateHandle, + FileOwnership fileOwnership, + FileBasedCache cache) { MappingEntrySource source = new HandleBackedMappingEntrySource(stateHandle); MappingEntry existingEntry = mappingTable.getOrDefault(key, null); if (existingEntry != null) { @@ -105,7 +111,8 @@ public class FileMappingManager { LOG.trace("Skip adding a file that already exists in mapping table: {}", key); } return existingEntry == null - ? addMappingEntry(key, new MappingEntry(1, source, fileOwnership, false, false)) + ? addMappingEntry( + key, new MappingEntry(1, source, fileOwnership, cache, false, false)) : existingEntry; } @@ -113,6 +120,7 @@ public class FileMappingManager { String key, Path filePath, FileOwnership fileOwnership, + FileBasedCache cache, boolean writing, boolean overwrite) { MappingEntrySource source = new FileBackedMappingEntrySource(filePath); @@ -144,7 +152,8 @@ public class FileMappingManager { } } return existingEntry == null - ? addMappingEntry(key, new MappingEntry(1, source, fileOwnership, false, writing)) + ? addMappingEntry( + key, new MappingEntry(1, source, fileOwnership, cache, false, writing)) : existingEntry; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java index 692a7f7f1a2..130ff03b539 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java @@ -21,6 +21,7 @@ package org.apache.flink.state.forst.fs.filemapping; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.asyncprocessing.ReferenceCounted; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.state.forst.fs.cache.FileBasedCache; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -28,6 +29,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; + /** * A file mapping entry that encapsulates source and destination path. Source Path : dest Path = 1 : * N. @@ -40,6 +43,8 @@ public class MappingEntry extends ReferenceCounted { FileOwnership fileOwnership; + final @Nullable FileBasedCache cache; + final boolean isDirectory; volatile boolean writing; @@ -56,6 +61,7 @@ public class MappingEntry extends ReferenceCounted { initReference, new HandleBackedMappingEntrySource(stateHandle), fileOwnership, + null, isDirectory, false); } @@ -66,6 +72,7 @@ public class MappingEntry extends ReferenceCounted { initReference, new FileBackedMappingEntrySource(sourcePath), fileOwnership, + null, isDirectory, false); } @@ -74,14 +81,23 @@ public class MappingEntry extends ReferenceCounted { int initReference, MappingEntrySource source, FileOwnership fileOwnership, + FileBasedCache cache, boolean isDirectory, boolean writing) { super(initReference); this.source = source; this.parentDir = null; this.fileOwnership = fileOwnership; + this.cache = cache; this.isDirectory = isDirectory; this.writing = writing; + if (!writing && cache != null && !isDirectory && source.cacheable()) { + try { + cache.registerInCache(source.getFilePath(), source.getSize()); + } catch (IOException e) { + LOG.warn("Failed to register file {} in cache.", source, e); + } + } } public void setFileOwnership(FileOwnership ownership) { @@ -133,6 +149,9 @@ public class MappingEntry extends ReferenceCounted { return; } source.delete(isDirectory); + if (cache != null && !isDirectory && source.cacheable()) { + cache.delete(source.getFilePath()); + } } catch (Exception e) { LOG.warn("Failed to delete file {}.", source, e); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java index 662f006a26b..829e65077f9 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java @@ -53,9 +53,9 @@ public class FileMappingManagerTest { private MappingEntry registerFile(FileMappingManager manager, Path filePath) { if (reuseCp) { return manager.registerReusedRestoredFile( - filePath.toString(), new FileStateHandle(filePath, 0), filePath); + filePath.toString(), new FileStateHandle(filePath, 0), filePath, null); } else { - return manager.createNewFile(filePath, false); + return manager.createNewFile(filePath, false, null); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java index d2c4a83bbd9..96a6904db50 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java @@ -348,6 +348,31 @@ public class ForStFlinkFileSystemTest { assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L); is.close(); + + // test link and deleted by reference + long waitLoaded = 0L; + while (waitLoaded < 30000L && cacheEntry1.getReferenceCount() <= 0) { + try { + Thread.sleep(100); + waitLoaded += 100; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1); + org.apache.flink.core.fs.Path sstRemotePath4 = + new org.apache.flink.core.fs.Path(remotePath, "4.sst"); + fileSystem.link(sstRemotePath1, sstRemotePath4); + assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1); + assertThat(fileSystem.exists(sstRemotePath4)).isTrue(); + fileSystem.delete(sstRemotePath1, false); + assertThat(fileSystem.exists(sstRemotePath1)).isFalse(); + assertThat(fileSystem.exists(sstRemotePath4)).isTrue(); + assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1); + assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L); + fileSystem.delete(sstRemotePath4, false); + assertThat(cacheEntry1.getReferenceCount()).isEqualTo(0); + assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(0L); } @Test
