This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62694fae763b9421dcb641f664cfc8a26ad71eb9
Author: Zakelly <[email protected]>
AuthorDate: Mon Mar 10 03:23:04 2025 +0800

    [FLINK-37437][state/forst] Bundle file cache entry with mapping entry and 
release properly
---
 .../flink/state/forst/fs/ForStFlinkFileSystem.java | 19 +++++-----------
 .../forst/fs/filemapping/FileMappingManager.java   | 23 ++++++++++++++------
 .../state/forst/fs/filemapping/MappingEntry.java   | 19 ++++++++++++++++
 .../state/forst/fs/FileMappingManagerTest.java     |  4 ++--
 .../state/forst/fs/ForStFlinkFileSystemTest.java   | 25 ++++++++++++++++++++++
 5 files changed, 67 insertions(+), 23 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index e69612ad127..afbc5a71505 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -191,7 +191,8 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
             Path dbFilePath, WriteMode overwriteMode) throws IOException {
         // Create a file in the mapping table
         MappingEntry createdMappingEntry =
-                fileMappingManager.createNewFile(dbFilePath, overwriteMode == 
WriteMode.OVERWRITE);
+                fileMappingManager.createNewFile(
+                        dbFilePath, overwriteMode == WriteMode.OVERWRITE, 
fileBasedCache);
 
         // The source must be backed by a file
         FileBackedMappingEntrySource source =
@@ -359,14 +360,7 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
 
     @Override
     public synchronized boolean delete(Path path, boolean recursive) throws 
IOException {
-        MappingEntry mappingEntry = 
fileMappingManager.mappingEntry(path.toString());
-        boolean success = fileMappingManager.deleteFileOrDirectory(path, 
recursive);
-        if (fileBasedCache != null && mappingEntry != null) {
-            // if mappingEntry is not null, it means it is a file, not 
directory
-            MappingEntrySource source = mappingEntry.getSource();
-            fileBasedCache.delete(source.getFilePath());
-        }
-        return success;
+        return fileMappingManager.deleteFileOrDirectory(path, recursive);
     }
 
     @Override
@@ -390,11 +384,8 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
     public synchronized void registerReusedRestoredFile(
             String key, StreamStateHandle stateHandle, Path dbFilePath) {
         MappingEntry mappingEntry =
-                fileMappingManager.registerReusedRestoredFile(key, 
stateHandle, dbFilePath);
-        if (fileBasedCache != null) {
-            fileBasedCache.registerInCache(
-                    mappingEntry.getSourcePath(), stateHandle.getStateSize());
-        }
+                fileMappingManager.registerReusedRestoredFile(
+                        key, stateHandle, dbFilePath, fileBasedCache);
     }
 
     public synchronized @Nullable MappingEntry getMappingEntry(Path path) {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
index a9709b06b1b..ca2ff8dbec4 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.state.forst.fs.cache.FileBasedCache;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -61,9 +62,10 @@ public class FileMappingManager {
     }
 
     /** Create a new file in the mapping table. */
-    public MappingEntry createNewFile(Path filePath, boolean overwrite) {
+    public MappingEntry createNewFile(Path filePath, boolean overwrite, 
FileBasedCache cache) {
         String key = filePath.toString();
-        if (FileOwnershipDecider.shouldAlwaysBeLocal(filePath)) {
+        boolean isLocal = FileOwnershipDecider.shouldAlwaysBeLocal(filePath);
+        if (isLocal) {
             filePath = forceLocalPath(filePath);
         }
 
@@ -71,23 +73,27 @@ public class FileMappingManager {
                 key,
                 toUUIDPath(filePath),
                 FileOwnershipDecider.decideForNewFile(filePath),
+                isLocal ? null : cache,
                 true,
                 overwrite);
     }
 
     /** Register a file restored from checkpoints to the mapping table. */
     public MappingEntry registerReusedRestoredFile(
-            String key, StreamStateHandle stateHandle, Path dbFilePath) {
+            String key, StreamStateHandle stateHandle, Path dbFilePath, 
FileBasedCache cache) {
         // The checkpoint file may contain only the UUID without the file 
extension, so we:
         //  - Decide file ownership based on dbFilePath, so we can know the 
real file type.
         //  - Add to mapping table based on cpFilePath, so we can access the 
real file.
         LOG.trace("decide restored file ownership based on dbFilePath: {}", 
dbFilePath);
         return addHandleBackedFileToMappingTable(
-                key, stateHandle, 
FileOwnershipDecider.decideForRestoredFile(dbFilePath));
+                key, stateHandle, 
FileOwnershipDecider.decideForRestoredFile(dbFilePath), cache);
     }
 
     private MappingEntry addHandleBackedFileToMappingTable(
-            String key, StreamStateHandle stateHandle, FileOwnership 
fileOwnership) {
+            String key,
+            StreamStateHandle stateHandle,
+            FileOwnership fileOwnership,
+            FileBasedCache cache) {
         MappingEntrySource source = new 
HandleBackedMappingEntrySource(stateHandle);
         MappingEntry existingEntry = mappingTable.getOrDefault(key, null);
         if (existingEntry != null) {
@@ -105,7 +111,8 @@ public class FileMappingManager {
             LOG.trace("Skip adding a file that already exists in mapping 
table: {}", key);
         }
         return existingEntry == null
-                ? addMappingEntry(key, new MappingEntry(1, source, 
fileOwnership, false, false))
+                ? addMappingEntry(
+                        key, new MappingEntry(1, source, fileOwnership, cache, 
false, false))
                 : existingEntry;
     }
 
@@ -113,6 +120,7 @@ public class FileMappingManager {
             String key,
             Path filePath,
             FileOwnership fileOwnership,
+            FileBasedCache cache,
             boolean writing,
             boolean overwrite) {
         MappingEntrySource source = new FileBackedMappingEntrySource(filePath);
@@ -144,7 +152,8 @@ public class FileMappingManager {
             }
         }
         return existingEntry == null
-                ? addMappingEntry(key, new MappingEntry(1, source, 
fileOwnership, false, writing))
+                ? addMappingEntry(
+                        key, new MappingEntry(1, source, fileOwnership, cache, 
false, writing))
                 : existingEntry;
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java
index 692a7f7f1a2..130ff03b539 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.forst.fs.filemapping;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.asyncprocessing.ReferenceCounted;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.state.forst.fs.cache.FileBasedCache;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -28,6 +29,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+
 /**
  * A file mapping entry that encapsulates source and destination path. Source 
Path : dest Path = 1 :
  * N.
@@ -40,6 +43,8 @@ public class MappingEntry extends ReferenceCounted {
 
     FileOwnership fileOwnership;
 
+    final @Nullable FileBasedCache cache;
+
     final boolean isDirectory;
 
     volatile boolean writing;
@@ -56,6 +61,7 @@ public class MappingEntry extends ReferenceCounted {
                 initReference,
                 new HandleBackedMappingEntrySource(stateHandle),
                 fileOwnership,
+                null,
                 isDirectory,
                 false);
     }
@@ -66,6 +72,7 @@ public class MappingEntry extends ReferenceCounted {
                 initReference,
                 new FileBackedMappingEntrySource(sourcePath),
                 fileOwnership,
+                null,
                 isDirectory,
                 false);
     }
@@ -74,14 +81,23 @@ public class MappingEntry extends ReferenceCounted {
             int initReference,
             MappingEntrySource source,
             FileOwnership fileOwnership,
+            FileBasedCache cache,
             boolean isDirectory,
             boolean writing) {
         super(initReference);
         this.source = source;
         this.parentDir = null;
         this.fileOwnership = fileOwnership;
+        this.cache = cache;
         this.isDirectory = isDirectory;
         this.writing = writing;
+        if (!writing && cache != null && !isDirectory && source.cacheable()) {
+            try {
+                cache.registerInCache(source.getFilePath(), source.getSize());
+            } catch (IOException e) {
+                LOG.warn("Failed to register file {} in cache.", source, e);
+            }
+        }
     }
 
     public void setFileOwnership(FileOwnership ownership) {
@@ -133,6 +149,9 @@ public class MappingEntry extends ReferenceCounted {
                 return;
             }
             source.delete(isDirectory);
+            if (cache != null && !isDirectory && source.cacheable()) {
+                cache.delete(source.getFilePath());
+            }
         } catch (Exception e) {
             LOG.warn("Failed to delete file {}.", source, e);
         }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
index 662f006a26b..829e65077f9 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
@@ -53,9 +53,9 @@ public class FileMappingManagerTest {
     private MappingEntry registerFile(FileMappingManager manager, Path 
filePath) {
         if (reuseCp) {
             return manager.registerReusedRestoredFile(
-                    filePath.toString(), new FileStateHandle(filePath, 0), 
filePath);
+                    filePath.toString(), new FileStateHandle(filePath, 0), 
filePath, null);
         } else {
-            return manager.createNewFile(filePath, false);
+            return manager.createNewFile(filePath, false, null);
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
index d2c4a83bbd9..96a6904db50 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
@@ -348,6 +348,31 @@ public class ForStFlinkFileSystemTest {
         
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L);
 
         is.close();
+
+        // test link and deleted by reference
+        long waitLoaded = 0L;
+        while (waitLoaded < 30000L && cacheEntry1.getReferenceCount() <= 0) {
+            try {
+                Thread.sleep(100);
+                waitLoaded += 100;
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
+        org.apache.flink.core.fs.Path sstRemotePath4 =
+                new org.apache.flink.core.fs.Path(remotePath, "4.sst");
+        fileSystem.link(sstRemotePath1, sstRemotePath4);
+        assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
+        assertThat(fileSystem.exists(sstRemotePath4)).isTrue();
+        fileSystem.delete(sstRemotePath1, false);
+        assertThat(fileSystem.exists(sstRemotePath1)).isFalse();
+        assertThat(fileSystem.exists(sstRemotePath4)).isTrue();
+        assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
+        
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L);
+        fileSystem.delete(sstRemotePath4, false);
+        assertThat(cacheEntry1.getReferenceCount()).isEqualTo(0);
+        
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(0L);
     }
 
     @Test

Reply via email to