This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f5f13ba9d4f1175d06ccc9eac8ac6c1979e4110
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Fri Feb 28 20:11:12 2025 +0800

    [FLINK-37379][state/forst] Double list LRU in ForSt state and cache reload
---
 docs/content.zh/docs/ops/metrics.md                |  10 +
 docs/content/docs/ops/metrics.md                   |  10 +
 .../shortcodes/generated/expert_forst_section.html |  12 +
 .../shortcodes/generated/forst_configuration.html  |  12 +
 .../state/forst/ForStKeyedStateBackendBuilder.java |   4 +
 .../org/apache/flink/state/forst/ForStOptions.java |  23 ++
 .../flink/state/forst/ForStResourceContainer.java  |   4 +
 .../flink/state/forst/ForStStateExecutor.java      |  47 ++-
 .../flink/state/forst/fs/ForStFlinkFileSystem.java |  33 +-
 .../forst/fs/cache/CachedDataInputStream.java      | 145 ++++++-
 .../forst/fs/cache/CachedDataOutputStream.java     |  15 +-
 .../flink/state/forst/fs/cache/DoubleListLru.java  | 442 +++++++++++++++++++++
 .../flink/state/forst/fs/cache/FileBasedCache.java | 294 ++++++++++++--
 .../flink/state/forst/fs/cache/FileCacheEntry.java | 269 ++++++++++++-
 .../flink/state/forst/fs/cache/LruCache.java       | 159 --------
 .../sync/ForStSyncKeyedStateBackendBuilder.java    |   4 +
 .../state/forst/fs/ForStFlinkFileSystemTest.java   |  38 +-
 .../state/forst/fs/cache/DoubleListLruTest.java    | 177 +++++++++
 18 files changed, 1458 insertions(+), 240 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index b57c05e80ba..5becdd66353 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1410,6 +1410,16 @@ Besides that, we support the following metrics:
       <td>The remaining space in the volume for the configured cache. Only 
available when 'state.backend.forst.cache.reserve-size' is set above 0. </td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td>lru.evict</td>
+      <td>The number of cache files that are evicted from LRU.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>lru.loadback</td>
+      <td>The number of cache files that are loaded back from remote storage 
into the LRU. </td>
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 11af538bad4..253c68ec866 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1400,6 +1400,16 @@ Besides that, we support the following metrics:
       <td>The remaining space in the volume for the configured cache. Only 
available when 'state.backend.forst.cache.reserve-size' is set above 0. </td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td>lru.evict</td>
+      <td>The number of cache files that are evicted from LRU.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>lru.loadback</td>
+      <td>The number of cache files that are loaded back from remote storage 
into the LRU. </td>
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 
diff --git a/docs/layouts/shortcodes/generated/expert_forst_section.html 
b/docs/layouts/shortcodes/generated/expert_forst_section.html
index ae289ad4582..81ec1999458 100644
--- a/docs/layouts/shortcodes/generated/expert_forst_section.html
+++ b/docs/layouts/shortcodes/generated/expert_forst_section.html
@@ -8,6 +8,18 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            
<td><h5>state.backend.forst.cache.lru.access-before-promote</h5></td>
+            <td style="word-wrap: break-word;">6</td>
+            <td>Integer</td>
+            <td>When the number of accesses to a block in cold link reaches 
this value, the block will be promoted to the head of the LRU list and become a 
hot link. The evicted file in cache will be reloaded as well. The default value 
is '5'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.forst.cache.lru.promote-limit</h5></td>
+            <td style="word-wrap: break-word;">3</td>
+            <td>Integer</td>
+            <td>When the number of eviction that a block in hot link is moved 
to cold link reaches this value, the block will be blocked from being promoted 
to the head of the LRU list. The default value is '3'.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.forst.executor.inline-coordinator</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/docs/layouts/shortcodes/generated/forst_configuration.html 
b/docs/layouts/shortcodes/generated/forst_configuration.html
index 704d549635b..b4abb2835bd 100644
--- a/docs/layouts/shortcodes/generated/forst_configuration.html
+++ b/docs/layouts/shortcodes/generated/forst_configuration.html
@@ -14,6 +14,18 @@
             <td>String</td>
             <td>The directory where ForSt caches its SST files, fallback to 
the subdirectory of '/cache' under the value of 'state.backend.forst.local-dir' 
if not configured.</td>
         </tr>
+        <tr>
+            
<td><h5>state.backend.forst.cache.lru.access-before-promote</h5></td>
+            <td style="word-wrap: break-word;">6</td>
+            <td>Integer</td>
+            <td>When the number of accesses to a block in cold link reaches 
this value, the block will be promoted to the head of the LRU list and become a 
hot link. The evicted file in cache will be reloaded as well. The default value 
is '5'.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.forst.cache.lru.promote-limit</h5></td>
+            <td style="word-wrap: break-word;">3</td>
+            <td>Integer</td>
+            <td>When the number of eviction that a block in hot link is moved 
to cold link reaches this value, the block will be blocked from being promoted 
to the head of the LRU list. The default value is '3'.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.forst.cache.reserve-size</h5></td>
             <td style="word-wrap: break-word;">0 bytes</td>
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 1c03aadf8c1..67aa91371c2 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -76,6 +76,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.WRITE_BATCH_SIZE;
+import static 
org.apache.flink.state.forst.fs.cache.FileBasedCache.setFlinkThread;
 
 /**
  * Builder class for {@link ForStKeyedStateBackend} which handles all 
necessary initializations and
@@ -247,6 +248,9 @@ public class ForStKeyedStateBackendBuilder<K>
         PriorityQueueSetFactory priorityQueueFactory;
 
         try {
+            // Current thread (task thread) must be a Flink thread to enable 
proper cache
+            // management.
+            setFlinkThread();
             optionsContainer.prepareDirectories();
             restoreOperation =
                     getForStRestoreOperation(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
index cdf5ee6db03..b61d561b532 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
@@ -107,6 +107,29 @@ public class ForStOptions {
                                             text(MemorySize.ZERO.toString()))
                                     .build());
 
+    @Documentation.Section(Documentation.Sections.EXPERT_FORST)
+    public static final ConfigOption<Integer> 
CACHE_LRU_ACCESS_BEFORE_PROMOTION =
+            
ConfigOptions.key("state.backend.forst.cache.lru.access-before-promote")
+                    .intType()
+                    .defaultValue(6)
+                    .withDescription(
+                            "When the number of accesses to "
+                                    + "a block in cold link reaches this 
value, the block will "
+                                    + "be promoted to the head of the LRU list 
and become a hot link. "
+                                    + "The evicted file in cache will be 
reloaded as well. "
+                                    + "The default value is '5'.");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_FORST)
+    public static final ConfigOption<Integer> CACHE_LRU_PROMOTION_LIMIT =
+            ConfigOptions.key("state.backend.forst.cache.lru.promote-limit")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "When the number of eviction that a block in hot 
link "
+                                    + "is moved to cold link reaches this 
value, the block will be blocked "
+                                    + "from being promoted to the head of the 
LRU list. "
+                                    + "The default value is '3'.");
+
     /** The options factory class for ForSt to create DBOptions and 
ColumnFamilyOptions. */
     @Documentation.Section(Documentation.Sections.EXPERT_FORST)
     public static final ConfigOption<String> OPTIONS_FACTORY =
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index e4e9d9c3bb6..3946c38980b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -386,6 +386,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
                             remoteForStPath.toUri(),
                             localForStPath,
                             ForStFlinkFileSystem.getFileBasedCache(
+                                    configuration,
                                     cacheBasePath,
                                     remoteForStPath,
                                     cacheCapacity,
@@ -456,6 +457,9 @@ public final class ForStResourceContainer implements 
AutoCloseable {
             sharedResources.close();
         }
         cleanRelocatedDbLogs();
+        if (forStFileSystem != null) {
+            forStFileSystem.close();
+        }
     }
 
     /**
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index 44814efb496..87cb07ede0e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.forst;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
 import org.apache.flink.runtime.asyncprocessing.StateRequest;
 import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
+import org.apache.flink.state.forst.fs.cache.FileBasedCache;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -93,13 +94,15 @@ public class ForStStateExecutor implements StateExecutor {
                     coordinatorInline
                             ? directExecutor
                             : Executors.newSingleThreadExecutor(
-                                    new ExecutorThreadFactory(
-                                            
"ForSt-StateExecutor-Coordinator-And-Write"));
+                                    new ForStExecutorThreadFactory(
+                                            
"ForSt-StateExecutor-Coordinator-And-Write",
+                                            FileBasedCache::setFlinkThread));
             this.readThreadCount = readIoParallelism;
             this.readThreads =
                     Executors.newFixedThreadPool(
                             readIoParallelism,
-                            new 
ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
+                            new ForStExecutorThreadFactory(
+                                    "ForSt-StateExecutor-read-IO", 
FileBasedCache::setFlinkThread));
             this.writeThreads = directExecutor;
             this.sharedWriteThread = true;
         } else {
@@ -108,13 +111,16 @@ public class ForStStateExecutor implements StateExecutor {
                     coordinatorInline
                             ? directExecutor
                             : Executors.newSingleThreadExecutor(
-                                    new 
ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
+                                    new ForStExecutorThreadFactory(
+                                            "ForSt-StateExecutor-Coordinator",
+                                            FileBasedCache::setFlinkThread));
             if (readIoParallelism <= 0 || writeIoParallelism <= 0) {
                 this.readThreadCount = Math.max(readIoParallelism, 
writeIoParallelism);
                 this.readThreads =
                         Executors.newFixedThreadPool(
                                 readThreadCount,
-                                new 
ExecutorThreadFactory("ForSt-StateExecutor-IO"));
+                                new ForStExecutorThreadFactory(
+                                        "ForSt-StateExecutor-IO", 
FileBasedCache::setFlinkThread));
                 this.writeThreads = readThreads;
                 this.sharedWriteThread = true;
             } else {
@@ -122,11 +128,15 @@ public class ForStStateExecutor implements StateExecutor {
                 this.readThreads =
                         Executors.newFixedThreadPool(
                                 readIoParallelism,
-                                new 
ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
+                                new ForStExecutorThreadFactory(
+                                        "ForSt-StateExecutor-read-IO",
+                                        FileBasedCache::setFlinkThread));
                 this.writeThreads =
                         Executors.newFixedThreadPool(
                                 writeIoParallelism,
-                                new 
ExecutorThreadFactory("ForSt-StateExecutor-write-IO"));
+                                new ForStExecutorThreadFactory(
+                                        "ForSt-StateExecutor-write-IO",
+                                        FileBasedCache::setFlinkThread));
                 this.sharedWriteThread = false;
             }
         }
@@ -306,4 +316,27 @@ public class ForStStateExecutor implements StateExecutor {
             executorService.shutdownNow();
         }
     }
+
+    /**
+     * An {@link ExecutorThreadFactory} that could run a initializer before 
running the actual
+     * runnable for each created thread.
+     */
+    private static class ForStExecutorThreadFactory extends 
ExecutorThreadFactory {
+
+        private final Runnable initializer;
+
+        public ForStExecutorThreadFactory(String name, Runnable initializer) {
+            super(name);
+            this.initializer = initializer;
+        }
+
+        @Override
+        public Thread newThread(Runnable runnable) {
+            return super.newThread(
+                    () -> {
+                        initializer.run();
+                        runnable.run();
+                    });
+        }
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index aea5c3d4d43..f3f8334cc4d 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.forst.fs;
 
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -59,7 +61,7 @@ import java.util.List;
  * <p>All methods in this class maybe used by ForSt, please start a discussion 
firstly if it has to
  * be modified.
  */
-public class ForStFlinkFileSystem extends FileSystem {
+public class ForStFlinkFileSystem extends FileSystem implements Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ForStFlinkFileSystem.class);
 
@@ -108,6 +110,7 @@ public class ForStFlinkFileSystem extends FileSystem {
     }
 
     public static FileBasedCache getFileBasedCache(
+            ReadableConfig config,
             Path cacheBase,
             Path remoteForStPath,
             long cacheCapacity,
@@ -140,11 +143,7 @@ public class ForStFlinkFileSystem extends FileSystem {
                             new File(cacheBase.toString()), cacheReservedSize, 
SST_FILE_SIZE);
         }
         return new FileBasedCache(
-                Integer.MAX_VALUE,
-                cacheLimitPolicy,
-                cacheBase.getFileSystem(),
-                cacheBase,
-                metricGroup);
+                config, cacheLimitPolicy, cacheBase.getFileSystem(), 
cacheBase, metricGroup);
     }
 
     public FileSystem getDelegateFS() {
@@ -317,10 +316,12 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     @Override
     public synchronized boolean delete(Path path, boolean recursive) throws 
IOException {
+        MappingEntry mappingEntry = 
fileMappingManager.mappingEntry(path.toString());
         boolean success = fileMappingManager.deleteFileOrDirectory(path, 
recursive);
-        if (fileBasedCache != null) {
-            // only new generated file will put into cache, no need to 
consider file mapping
-            fileBasedCache.delete(path);
+        if (fileBasedCache != null && mappingEntry != null) {
+            // if mappingEntry is not null, it means it is a file, not 
directory
+            MappingEntrySource source = mappingEntry.getSource();
+            fileBasedCache.delete(source.getFilePath());
         }
         return success;
     }
@@ -345,7 +346,12 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     public synchronized void registerReusedRestoredFile(
             String key, StreamStateHandle stateHandle, Path dbFilePath) {
-        fileMappingManager.registerReusedRestoredFile(key, stateHandle, 
dbFilePath);
+        MappingEntry mappingEntry =
+                fileMappingManager.registerReusedRestoredFile(key, 
stateHandle, dbFilePath);
+        if (fileBasedCache != null) {
+            fileBasedCache.registerInCache(
+                    mappingEntry.getSourcePath(), stateHandle.getStateSize());
+        }
     }
 
     public synchronized @Nullable MappingEntry getMappingEntry(Path path) {
@@ -378,6 +384,13 @@ public class ForStFlinkFileSystem extends FileSystem {
                 : fileBasedCache.open(source.getFilePath(), inputStream);
     }
 
+    @Override
+    public void close() throws IOException {
+        if (fileBasedCache != null) {
+            fileBasedCache.close();
+        }
+    }
+
     public static class FileStatusWrapper implements FileStatus {
         private final FileStatus delegate;
         private final Path path;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java
index 8b8bd6f8df2..774cb65f74e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java
@@ -28,6 +28,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Semaphore;
 
+import static 
org.apache.flink.state.forst.fs.cache.FileBasedCache.isFlinkThread;
+
 /**
  * A {@link FSDataInputStream} delegates requests to other one and supports 
reading data with {@link
  * ByteBuffer}. One CachedDataInputStream only supports one thread reading 
which is guaranteed by
@@ -56,38 +58,137 @@ public class CachedDataInputStream extends 
FSDataInputStream implements ByteBuff
 
     private Semaphore semaphore;
 
+    private final FileBasedCache fileBasedCache;
+
+    private boolean closed = false;
+
     public CachedDataInputStream(
+            FileBasedCache fileBasedCache,
             FileCacheEntry cacheEntry,
             FSDataInputStream cacheStream,
             FSDataInputStream originalStream) {
+        this.fileBasedCache = fileBasedCache;
         this.cacheEntry = cacheEntry;
         this.fsdis = cacheStream;
         this.originalStream = originalStream;
         this.streamStatus = StreamStatus.CACHED_OPEN;
         this.semaphore = new Semaphore(0);
+        LOG.trace("Create CachedDataInputStream for {} with CACHED_OPEN", 
cacheEntry.cachePath);
+    }
+
+    public CachedDataInputStream(
+            FileBasedCache fileBasedCache,
+            FileCacheEntry cacheEntry,
+            FSDataInputStream originalStream) {
+        this.fileBasedCache = fileBasedCache;
+        this.cacheEntry = cacheEntry;
+        this.fsdis = null;
+        this.originalStream = originalStream;
+        this.streamStatus = StreamStatus.CACHED_CLOSED;
+        this.semaphore = new Semaphore(0);
+        LOG.trace("Create CachedDataInputStream for {} with CACHED_CLOSED", 
cacheEntry.cachePath);
     }
 
+    /**
+     * Retrieves the appropriate input stream for reading data. This method 
attempts to use the
+     * cached stream if it is available and valid. If the cached stream is not 
available, it falls
+     * back to the original stream. The method also handles the transition 
between cached and
+     * original streams based on the current status of the stream.
+     *
+     * @return the input stream to be used for reading data
+     * @throws IOException if an I/O error occurs while accessing the stream
+     */
     private FSDataInputStream getStream() throws IOException {
-        if (streamStatus == StreamStatus.CACHED_OPEN && cacheEntry.tryRetain() 
> 0) {
-            return fsdis;
-        } else if (streamStatus != StreamStatus.ORIGINAL) {
-            try {
-                semaphore.acquire(1);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+        if (isFlinkThread()) {
+            cacheEntry.touch();
+        }
+        FSDataInputStream stream = tryGetCacheStream();
+        if (stream != null) {
+            fileBasedCache.incHitCounter();
+            return stream;
+        }
+
+        if (streamStatus == StreamStatus.CACHED_CLOSED
+                || streamStatus == StreamStatus.CACHED_CLOSING) {
+            if (streamStatus == StreamStatus.CACHED_CLOSING) {
+                try {
+                    semaphore.acquire(1);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                originalStream.seek(position);
+                position = -1;
+                LOG.trace(
+                        "Stream {} status from {} to {}",
+                        cacheEntry.cachePath,
+                        streamStatus,
+                        StreamStatus.CACHED_CLOSED);
+                streamStatus = StreamStatus.CACHED_CLOSED;
+            }
+            // try reopen
+            tryReopen();
+            stream = tryGetCacheStream();
+            if (stream != null) {
+                fileBasedCache.incHitCounter();
+                return stream;
             }
-            originalStream.seek(position);
-            position = -1;
-            streamStatus = StreamStatus.ORIGINAL;
+            fileBasedCache.incMissCounter();
+            return originalStream;
+        } else if (streamStatus == StreamStatus.ORIGINAL) {
+            fileBasedCache.incMissCounter();
             return originalStream;
         } else {
+            if (streamStatus == StreamStatus.CACHED_OPEN) {
+                stream = tryGetCacheStream();
+                if (stream != null) {
+                    fileBasedCache.incHitCounter();
+                    return stream;
+                }
+            }
+            fileBasedCache.incMissCounter();
             return originalStream;
         }
     }
 
-    private void closeStream() throws IOException {
+    private FSDataInputStream tryGetCacheStream() {
+        if (streamStatus == StreamStatus.CACHED_OPEN && cacheEntry.tryRetain() 
> 0) {
+            return fsdis;
+        }
+        return null;
+    }
+
+    private void tryReopen() {
+        if (streamStatus == StreamStatus.CACHED_CLOSED && isFlinkThread()) {
+            try {
+                fsdis = cacheEntry.getCacheStream();
+                if (fsdis != null) {
+                    LOG.trace(
+                            "Stream {} status from {} to {}",
+                            cacheEntry.cachePath,
+                            streamStatus,
+                            StreamStatus.CACHED_OPEN);
+                    fsdis.seek(originalStream.getPos());
+                    streamStatus = StreamStatus.CACHED_OPEN;
+                    cacheEntry.release();
+                }
+            } catch (IOException e) {
+                LOG.warn("Reopen stream error.", e);
+            }
+        }
+    }
+
+    /**
+     * Closes the cached stream if it is open. Note that this might be invoked 
by different threads,
+     * the user thread or the cache eviction (async) thread.
+     */
+    synchronized void closeCachedStream() throws IOException {
         if (streamStatus == StreamStatus.CACHED_OPEN) {
-            streamStatus = StreamStatus.CACHED_CLOSED;
+            LOG.trace(
+                    "Stream {} status from {} to {}",
+                    cacheEntry.cachePath,
+                    streamStatus,
+                    StreamStatus.CACHED_CLOSING);
+            streamStatus = StreamStatus.CACHED_CLOSING;
             position = fsdis.getPos();
             fsdis.close();
             fsdis = null;
@@ -166,7 +267,15 @@ public class CachedDataInputStream extends 
FSDataInputStream implements ByteBuff
 
     @Override
     public void close() throws IOException {
-        closeStream();
+        if (closed) {
+            return;
+        }
+        closed = true;
+        closeCachedStream();
+    }
+
+    public boolean isClosed() {
+        return closed;
     }
 
     @Override
@@ -252,10 +361,18 @@ public class CachedDataInputStream extends 
FSDataInputStream implements ByteBuff
         return n;
     }
 
-    /** The status of the underlying stream. */
+    /** The status of the underlying cache stream. */
     enum StreamStatus {
+        /** The cached stream is open and available for reading. */
         CACHED_OPEN,
+
+        /** The cached stream is in the process of closing. */
+        CACHED_CLOSING,
+
+        /** The cached stream is closed and not available for reading. */
         CACHED_CLOSED,
+
+        /** The original stream is being used instead of the cached stream. */
         ORIGINAL
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataOutputStream.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataOutputStream.java
index f398801efa2..51a781e7194 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataOutputStream.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataOutputStream.java
@@ -21,14 +21,21 @@ package org.apache.flink.state.forst.fs.cache;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
  * A {@link FSDataOutputStream} delegates requests to other one and supports 
writing data with
- * {@link ByteBuffer}.
+ * {@link ByteBuffer}. The data will be written to the original output stream 
and the cache output
+ * stream. When the stream is closed, the data will be put into the cache and 
ready to be read.
  */
 public class CachedDataOutputStream extends FSDataOutputStream {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CachedDataOutputStream.class);
+
     /** The original path of file. */
     private final Path originalPath;
 
@@ -52,6 +59,7 @@ public class CachedDataOutputStream extends 
FSDataOutputStream {
         this.cachePath = cachePath;
         this.cacheOutputStream = cacheOutputStream;
         this.fileBasedCache = cache;
+        LOG.trace("Create CachedDataOutputStream for {} and {}", originalPath, 
cachePath);
     }
 
     @Override
@@ -105,6 +113,9 @@ public class CachedDataOutputStream extends 
FSDataOutputStream {
         long thisSize = cacheOutputStream.getPos();
         FileCacheEntry fileCacheEntry =
                 new FileCacheEntry(fileBasedCache, originalPath, cachePath, 
thisSize);
-        fileBasedCache.put(cachePath.toString(), fileCacheEntry);
+        fileCacheEntry.switchStatus(
+                FileCacheEntry.EntryStatus.REMOVED, 
FileCacheEntry.EntryStatus.LOADED);
+        fileCacheEntry.loaded();
+        fileBasedCache.addFirst(cachePath.toString(), fileCacheEntry);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/DoubleListLru.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/DoubleListLru.java
new file mode 100644
index 00000000000..d3d011a336d
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/DoubleListLru.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs.cache;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A double link LRU (Least Recently Used) cache implementation. This cache 
maintains two linked
+ * lists to manage the cache entries. The first list contains the most 
recently used entries, and
+ * the second list contains the less recently used entries. The cache also 
maintains a middle
+ * pointer to efficiently manage the entries. No thread-safe guarantees are 
provided.
+ *
+ * @param <K> the type of keys maintained by this cache
+ * @param <V> the type of mapped values
+ */
+public abstract class DoubleListLru<K, V> implements Iterable<Tuple2<K, V>> {
+
+    class Node {
+        K key;
+        V value;
+        Node prev;
+        Node next;
+        boolean isBeforeMiddle;
+
+        Node(K key, V value) {
+            this.key = key;
+            this.value = value;
+            this.isBeforeMiddle = false;
+        }
+    }
+
+    private Node head;
+    private Node tail;
+    private Node middle;
+    private int size;
+    private int secondSize;
+    private final HashMap<K, Node> map;
+
+    public DoubleListLru() {
+        this.head = null;
+        this.tail = null;
+        this.middle = null;
+        this.size = 0;
+        this.map = new HashMap<>();
+    }
+
+    // -------------------
+    // Hook methods
+    // -------------------
+
+    /**
+     * Checks if it is safe to add a value to the first list. Will be called 
before adding a value
+     * to the first list.
+     *
+     * @param value the value to be added to the first list
+     * @return true if it is safe to add the value to the first list, false 
otherwise.
+     */
+    abstract boolean isSafeToAddFirst(V value);
+
+    /**
+     * Hook method called when a new node is created.
+     *
+     * @param value the value of the new node
+     * @param n the new node
+     */
+    abstract void newNodeCreated(V value, Node n);
+
+    /**
+     * Hook method called when a value is added to the first list.
+     *
+     * @param value the value added to the first list
+     */
+    abstract void addedToFirst(V value);
+
+    /**
+     * Hook method called when a value is added to the second list.
+     *
+     * @param value the value added to the second list
+     */
+    abstract void addedToSecond(V value);
+
+    /**
+     * Hook method called when a value is removed from the first list.
+     *
+     * @param value the value removed from the first list
+     */
+    abstract void removedFromFirst(V value);
+
+    /**
+     * Hook method called when a value is removed from the second list.
+     *
+     * @param value the value removed from the second list
+     */
+    abstract void removedFromSecond(V value);
+
+    /**
+     * Hook method called when a value is moved to the first list.
+     *
+     * @param value the value moved to the first list
+     */
+    abstract void movedToFirst(V value);
+
+    /**
+     * Hook method called when a value is moved to the second list.
+     *
+     * @param value the value moved to the second list
+     */
+    abstract void movedToSecond(V value);
+
+    /**
+     * Hook method called when a node is accessed in the second list.
+     *
+     * @param value the value of the accessed node
+     * @return true if the node should be promoted to the first list, false 
otherwise
+     */
+    abstract boolean nodeAccessedAtSecond(V value);
+
+    /**
+     * Hook method called when a value is promoted to the first list.
+     *
+     * @param value the promoted value
+     */
+    abstract void promotedToFirst(V value);
+
+    /**
+     * Adds a new entry to the front of the cache.
+     *
+     * @param key the key of the entry
+     * @param value the value of the entry
+     */
+    public void addFirst(K key, V value) {
+        if (!isSafeToAddFirst(value)) {
+            addSecond(key, value);
+            return;
+        }
+        Node newNode = new Node(key, value);
+        newNodeCreated(value, newNode);
+        map.put(key, newNode);
+        if (head == null) {
+            head = tail = newNode;
+        } else {
+            newNode.next = head;
+            head.prev = newNode;
+            head = newNode;
+        }
+        newNode.isBeforeMiddle = true;
+        size++;
+        addedToFirst(value);
+    }
+
+    /** Moves the middle pointer back by one position. */
+    public void moveMiddleBack() {
+        if (middle != null) {
+            middle.isBeforeMiddle = true;
+            V theValue = middle.value;
+            middle = middle.next;
+            secondSize--;
+            movedToFirst(theValue);
+        }
+    }
+
+    /** Moves the middle pointer forward by one position. */
+    public void moveMiddleFront() {
+        if (middle != null && middle.prev != null) {
+            middle = middle.prev;
+            middle.isBeforeMiddle = false;
+            secondSize++;
+            movedToSecond(middle.value);
+        } else if (middle == null && size > 0) {
+            middle = tail;
+            middle.isBeforeMiddle = false;
+            secondSize++;
+            movedToSecond(middle.value);
+        }
+    }
+
+    /**
+     * Inserts a new entry at the middle of the cache.
+     *
+     * @param key the key of the entry
+     * @param value the value of the entry
+     */
+    public void addSecond(K key, V value) {
+        Node newNode = new Node(key, value);
+        newNodeCreated(value, newNode);
+        map.put(key, newNode);
+        if (head == null) {
+            head = tail = middle = newNode;
+        } else if (middle == null) {
+            newNode.prev = tail;
+            tail.next = newNode;
+            tail = newNode;
+            middle = newNode;
+        } else {
+            newNode.next = middle;
+            newNode.prev = middle.prev;
+            if (middle.prev != null) {
+                middle.prev.next = newNode;
+            } else {
+                // head == middle
+                head = newNode;
+            }
+            middle.prev = newNode;
+            middle = newNode;
+        }
+        newNode.isBeforeMiddle = false;
+        secondSize++;
+        size++;
+        addedToSecond(value);
+    }
+
+    /**
+     * Returns the value of the middle entry in the cache.
+     *
+     * @return the value of the middle entry, or null if the cache is empty
+     */
+    @VisibleForTesting
+    V getMiddle() {
+        return middle != null ? middle.value : null;
+    }
+
+    /**
+     * Retrieves the value associated with the specified key. Optionally 
affects the order of the
+     * entries in the cache.
+     *
+     * @param key the key of the entry
+     * @param affectOrder true if the order of the entries should be affected, 
false otherwise
+     * @return the value associated with the key, or null if the key is not 
found
+     */
+    public V get(K key, boolean affectOrder) {
+        Node node = map.get(key);
+        if (node == null) {
+            return null;
+        }
+        if (!affectOrder) {
+            return node.value;
+        }
+        accessNode(node);
+        return node.value;
+    }
+
+    /**
+     * Removes the entry associated with the specified key from the cache.
+     *
+     * @param key the key of the entry to be removed
+     * @return the value of the removed entry, or null if the key is not found
+     */
+    public V remove(K key) {
+        Node node = map.get(key);
+        if (node == null) {
+            return null;
+        }
+        if (node == head || node == tail) {
+            if (node == head) {
+                head = node.next;
+                if (head != null) {
+                    head.prev = null;
+                }
+            }
+            if (node == tail) {
+                tail = node.prev;
+                if (tail != null) {
+                    tail.next = null;
+                }
+            }
+        } else {
+            node.prev.next = node.next;
+            node.next.prev = node.prev;
+        }
+        if (node == middle) {
+            middle = node.next;
+        }
+        map.remove(key);
+        size--;
+        if (node.isBeforeMiddle) {
+            removedFromFirst(node.value);
+        } else {
+            secondSize--;
+            removedFromSecond(node.value);
+        }
+        return node.value;
+    }
+
+    void accessNode(Node node) {
+        if (node.isBeforeMiddle) {
+            moveToFront(node);
+        } else {
+            moveToMiddle(node);
+            if (nodeAccessedAtSecond(node.value) && 
isSafeToAddFirst(node.value)) {
+                moveMiddleBack();
+                moveToFront(node);
+                promotedToFirst(node.value);
+            }
+        }
+    }
+
+    /**
+     * Moves the specified node to the front of the cache.
+     *
+     * @param node the node to be moved to the front
+     */
+    private void moveToFront(Node node) {
+        assert node.isBeforeMiddle;
+        if (node == head) {
+            return;
+        }
+        if (node == tail) {
+            tail = node.prev;
+            tail.next = null;
+        } else {
+            node.prev.next = node.next;
+            node.next.prev = node.prev;
+        }
+        node.prev = null;
+        node.next = head;
+        head.prev = node;
+        head = node;
+    }
+
+    /**
+     * Moves the specified node to the middle of the cache.
+     *
+     * @param node the node to be moved to the middle
+     */
+    private void moveToMiddle(Node node) {
+        assert !node.isBeforeMiddle;
+        if (node == middle) {
+            return;
+        }
+        if (node == tail) {
+            tail = node.prev;
+            tail.next = null;
+        } else {
+            node.prev.next = node.next;
+            node.next.prev = node.prev;
+        }
+        node.next = middle;
+        node.prev = middle.prev;
+        if (middle.prev != null) {
+            middle.prev.next = node;
+        } else {
+            // head == middle
+            head = node;
+        }
+        middle.prev = node;
+        middle = node;
+    }
+
+    /**
+     * Returns the number of entries in the cache.
+     *
+     * @return the number of entries in the cache
+     */
+    public int size() {
+        return size;
+    }
+
+    public int getSecondSize() {
+        return secondSize;
+    }
+
+    /**
+     * Checks if the cache is empty.
+     *
+     * @return true if the cache is empty, false otherwise
+     */
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    // ----------------------
+    // Iterators
+    // ----------------------
+
+    @Override
+    public Iterator<Tuple2<K, V>> iterator() {
+        return new LruIterator();
+    }
+
+    public Iterator<Tuple2<K, V>> descendingIterator() {
+        return new DecendingLruIterator();
+    }
+
+    private class LruIterator implements Iterator<Tuple2<K, V>> {
+        private Node current = head;
+
+        @Override
+        public boolean hasNext() {
+            return current != null;
+        }
+
+        @Override
+        public Tuple2<K, V> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            Tuple2<K, V> entry = Tuple2.of(current.key, current.value);
+            current = current.next;
+            return entry;
+        }
+    }
+
+    private class DecendingLruIterator implements Iterator<Tuple2<K, V>> {
+        private Node current = tail;
+
+        @Override
+        public boolean hasNext() {
+            return current != null;
+        }
+
+        @Override
+        public Tuple2<K, V> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            Tuple2<K, V> entry = Tuple2.of(current.key, current.value);
+            current = current.prev;
+            return entry;
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileBasedCache.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileBasedCache.java
index 5d3f5824a8e..a732bb05341 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileBasedCache.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileBasedCache.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.state.forst.fs.cache;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -25,59 +27,108 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.state.forst.ForStOptions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * A file-granularity LRU cache. Only newly generated SSTs are written to the 
cache, the file
  * reading from the remote will not. Newly generated SSTs are written to the 
original file system
  * and cache simultaneously, so, the cached file can be directly deleted with 
persisting when
- * evicting.
+ * evicting. The {@link FileBasedCache}, {@link FileCacheEntry}, {@link 
CachedDataInputStream}, and
+ * {@link CachedDataOutputStream} classes work together to implement a 
file-based caching mechanism
+ * in ForSt State Backend.
+ * <li>FileBasedCache manages multiple FileCacheEntry instances.
+ * <li>Each FileCacheEntry represents a cached file and can open 
CachedDataInputStream for reading
+ *     the file.
+ * <li>CachedDataInputStream instances are created by FileCacheEntry and can 
read data from either
+ *     the cached file or the original file, depending on the cache entry's 
state. It has internal
+ *     stream status to indicate the current reading source.
+ * <li>CachedDataOutputStream instances are created by FileBasedCache and 
write data to both the
+ *     original and cached files, creating a new cache entry in the cache when 
the writing is
+ *     finished.
  */
-public class FileBasedCache extends LruCache<String, FileCacheEntry> {
+public final class FileBasedCache extends DoubleListLru<String, FileCacheEntry>
+        implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedCache.class);
 
     private static final String FORST_CACHE_PREFIX = "forst.fileCache";
 
+    private static final ThreadLocal<Boolean> isFlinkThread = 
ThreadLocal.withInitial(() -> false);
+
+    private final CacheLimitPolicy cacheLimitPolicy;
+
     /** The file system of cache. */
     final FileSystem cacheFs;
 
     /** The base path of cache. */
     private final Path basePath;
 
+    /** The number of times a file is accessed before it is promoted to the 
first link . */
+    private final int accessBeforePromote;
+
+    /**
+     * The threshold count of promotion times, beyond which the file will be 
blocked out of first
+     * link.
+     */
+    private final int promoteLimit;
+
     /** Whether the cache is closed. */
     private volatile boolean closed;
 
-    private MetricGroup metricGroup;
+    /** Executor service for handling cache operations. */
+    private final ExecutorService executorService;
 
     /** Hit metric. */
-    private transient Counter hitCounter;
+    private Counter hitCounter;
 
     /** Miss metric. */
-    private transient Counter missCounter;
+    private Counter missCounter;
+
+    /** Metric for load back. */
+    private Counter loadBackCounter;
+
+    /** Metric for eviction. */
+    private Counter evictCounter;
+
+    /** Epoch for second link access. */
+    private long secondAccessEpoch = 0L;
 
     public FileBasedCache(
-            int capacity,
+            ReadableConfig configuration,
             CacheLimitPolicy cacheLimitPolicy,
             FileSystem cacheFs,
             Path basePath,
             MetricGroup metricGroup) {
-        super(capacity, cacheLimitPolicy);
         this.closed = false;
+        this.cacheLimitPolicy = cacheLimitPolicy;
         this.cacheFs = cacheFs;
         this.basePath = basePath;
+        this.accessBeforePromote =
+                Math.max(1, 
configuration.get(ForStOptions.CACHE_LRU_ACCESS_BEFORE_PROMOTION));
+        this.promoteLimit = 
configuration.get(ForStOptions.CACHE_LRU_PROMOTION_LIMIT);
+        this.executorService =
+                Executors.newFixedThreadPool(4, new 
ExecutorThreadFactory("ForSt-LruLoader"));
         if (metricGroup != null) {
-            this.metricGroup = metricGroup;
             this.hitCounter =
                     metricGroup.counter(FORST_CACHE_PREFIX + ".hit", new 
ThreadSafeSimpleCounter());
             this.missCounter =
                     metricGroup.counter(
                             FORST_CACHE_PREFIX + ".miss", new 
ThreadSafeSimpleCounter());
-            metricGroup.gauge(
-                    FORST_CACHE_PREFIX + ".usedBytes", () -> 
cacheLimitPolicy.usedBytes());
+            this.loadBackCounter =
+                    metricGroup.counter(
+                            FORST_CACHE_PREFIX + ".lru.loadback", new 
ThreadSafeSimpleCounter());
+            this.evictCounter =
+                    metricGroup.counter(
+                            FORST_CACHE_PREFIX + ".lru.evict", new 
ThreadSafeSimpleCounter());
+            metricGroup.gauge(FORST_CACHE_PREFIX + ".usedBytes", 
cacheLimitPolicy::usedBytes);
             cacheLimitPolicy.registerCustomizedMetrics(FORST_CACHE_PREFIX, 
metricGroup);
         }
         LOG.info(
@@ -86,7 +137,39 @@ public class FileBasedCache extends LruCache<String, 
FileCacheEntry> {
                 cacheLimitPolicy);
     }
 
-    Path getCachePath(Path fromOriginal) {
+    /**
+     * Sets the current thread as a Flink thread. This method is used to mark 
the thread as a Flink
+     * thread, which can be used to determine whether the file access would 
affect the LRU cache
+     * order, or metrics updates.
+     */
+    public static void setFlinkThread() {
+        isFlinkThread.set(true);
+    }
+
+    /**
+     * Checks if the current thread is a Flink thread. This method returns a 
boolean indicating
+     * whether the current thread has been marked as a Flink thread using the 
{@link
+     * #setFlinkThread()} method.
+     *
+     * @return true if the current thread is a Flink thread, false otherwise.
+     */
+    public static boolean isFlinkThread() {
+        return isFlinkThread.get();
+    }
+
+    public void incHitCounter() {
+        if (hitCounter != null && isFlinkThread.get()) {
+            hitCounter.inc();
+        }
+    }
+
+    public void incMissCounter() {
+        if (missCounter != null && isFlinkThread.get()) {
+            missCounter.inc();
+        }
+    }
+
+    private Path getCachePath(Path fromOriginal) {
         return new Path(basePath, fromOriginal.getName());
     }
 
@@ -95,7 +178,7 @@ public class FileBasedCache extends LruCache<String, 
FileCacheEntry> {
         if (closed) {
             return null;
         }
-        FileCacheEntry entry = get(getCachePath(path).toString());
+        FileCacheEntry entry = get(getCachePath(path).toString(), 
isFlinkThread());
         if (entry != null) {
             return entry.open(originalStream);
         } else {
@@ -123,28 +206,191 @@ public class FileBasedCache extends LruCache<String, 
FileCacheEntry> {
         }
     }
 
+    // -----------------------------------------------------------------------
+    // Overriding methods of {@link DoubleListLru} to provide thread-safe.
+    // -----------------------------------------------------------------------
+
     @Override
-    FileCacheEntry internalGet(String key, FileCacheEntry value) {
-        if (metricGroup != null) {
-            if (value != null) {
-                hitCounter.inc();
-            } else {
-                missCounter.inc();
+    public FileCacheEntry get(String key, boolean affectOrder) {
+        synchronized (this) {
+            return super.get(key, affectOrder);
+        }
+    }
+
+    @Override
+    public void addFirst(String key, FileCacheEntry value) {
+        synchronized (this) {
+            super.addFirst(key, value);
+        }
+    }
+
+    @Override
+    public void addSecond(String key, FileCacheEntry value) {
+        synchronized (this) {
+            super.addSecond(key, value);
+        }
+    }
+
+    @Override
+    public FileCacheEntry remove(String key) {
+        synchronized (this) {
+            return super.remove(key);
+        }
+    }
+
+    /** Directly insert in cache when restoring. */
+    public void registerInCache(Path originalPath, long size) {
+        Path cachePath = getCachePath(originalPath);
+        FileCacheEntry fileCacheEntry = new FileCacheEntry(this, originalPath, 
cachePath, size);
+        // We want the new registered cache to load ASAP, so assign a initial 
access count.
+        fileCacheEntry.accessCountInColdLink = Math.max(0, accessBeforePromote 
- 2);
+        addSecond(cachePath.toString(), fileCacheEntry);
+    }
+
+    void removeFile(FileCacheEntry entry) {
+        if (closed) {
+            entry.doRemoveFile();
+        } else {
+            executorService.submit(entry::doRemoveFile);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        executorService.shutdown();
+        for (Tuple2<String, FileCacheEntry> entry : this) {
+            entry.f1.close();
+        }
+    }
+
+    // -----------------------------
+    // Hook methods implementation
+    // -----------------------------
+
+    @Override
+    boolean isSafeToAddFirst(FileCacheEntry value) {
+        return cacheLimitPolicy.isSafeToAdd(value.entrySize);
+    }
+
+    @Override
+    void newNodeCreated(FileCacheEntry value, DoubleListLru<String, 
FileCacheEntry>.Node n) {
+        value.setTouchFunction(
+                () -> {
+                    // provide synchronized access to the LRU cache.
+                    synchronized (FileBasedCache.this) {
+                        accessNode(n);
+                    }
+                });
+    }
+
+    @Override
+    void addedToFirst(FileCacheEntry value) {
+        LOG.trace("Cache entry {} added to first link.", value.cachePath);
+        while (cacheLimitPolicy.isOverflow(value.entrySize)) {
+            moveMiddleFront();
+        }
+        cacheLimitPolicy.acquire(value.entrySize);
+    }
+
+    @Override
+    void addedToSecond(FileCacheEntry value) {
+        LOG.trace("Cache entry {} added to second link.", value.cachePath);
+        value.secondAccessEpoch = (++secondAccessEpoch);
+        tryEvict(value);
+    }
+
+    @Override
+    void removedFromFirst(FileCacheEntry value) {
+        cacheLimitPolicy.release(value.entrySize);
+        value.close();
+    }
+
+    @Override
+    void removedFromSecond(FileCacheEntry value) {
+        value.close();
+    }
+
+    @Override
+    void movedToFirst(FileCacheEntry entry) {
+        // here we won't consider the cache limit policy.
+        // since there will be promotedToFirst called after this.
+        LOG.trace("Cache entry {} moved to first link.", entry.cachePath);
+        // trigger the loading
+        if (entry.switchStatus(
+                FileCacheEntry.EntryStatus.INVALID, 
FileCacheEntry.EntryStatus.LOADED)) {
+            // just a try
+            entry.loaded();
+            if (loadBackCounter != null) {
+                loadBackCounter.inc();
             }
         }
-        return value;
+        if (entry.switchStatus(
+                FileCacheEntry.EntryStatus.REMOVED, 
FileCacheEntry.EntryStatus.LOADING)) {
+            executorService.submit(
+                    () -> {
+                        if 
(entry.checkStatus(FileCacheEntry.EntryStatus.LOADING)) {
+                            Path path = entry.load();
+                            if (path == null) {
+                                entry.switchStatus(
+                                        FileCacheEntry.EntryStatus.LOADING,
+                                        FileCacheEntry.EntryStatus.REMOVED);
+                            } else if (entry.switchStatus(
+                                    FileCacheEntry.EntryStatus.LOADING,
+                                    FileCacheEntry.EntryStatus.LOADED)) {
+                                entry.loaded();
+                                if (loadBackCounter != null) {
+                                    loadBackCounter.inc();
+                                }
+                            } else {
+                                try {
+                                    path.getFileSystem().delete(path, false);
+                                    // delete the file
+                                } catch (IOException e) {
+                                }
+                            }
+                        }
+                    });
+        }
     }
 
     @Override
-    void internalInsert(String key, FileCacheEntry value) {}
+    void movedToSecond(FileCacheEntry value) {
+        // trigger the evicting
+        LOG.trace("Cache entry {} moved to second link.", value.cachePath);
+        cacheLimitPolicy.release(value.entrySize);
+        tryEvict(value);
+    }
 
     @Override
-    void internalRemove(FileCacheEntry value) {
-        value.invalidate();
+    boolean nodeAccessedAtSecond(FileCacheEntry value) {
+        if (secondAccessEpoch - value.secondAccessEpoch >= getSecondSize() / 
2) {
+            // current entry is at last half of the second link
+            value.accessCountInColdLink = 0;
+            secondAccessEpoch++;
+        }
+        value.secondAccessEpoch = secondAccessEpoch;
+        return value.evictCount < promoteLimit
+                && ++value.accessCountInColdLink >= accessBeforePromote;
     }
 
     @Override
-    long getValueResource(FileCacheEntry value) {
-        return value.entrySize;
+    void promotedToFirst(FileCacheEntry value) {
+        value.accessCountInColdLink = 0;
+        while (cacheLimitPolicy.isOverflow(value.entrySize)) {
+            moveMiddleFront();
+        }
+        cacheLimitPolicy.acquire(value.entrySize);
+    }
+
+    /** Tool method that evict a file cache, releasing the owned reference if 
needed. */
+    private void tryEvict(FileCacheEntry value) {
+        if (value.invalidate() && evictCounter != null) {
+            evictCounter.inc();
+            value.evictCount++;
+        }
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileCacheEntry.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileCacheEntry.java
index e87914b56de..4b1e9c70672 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileCacheEntry.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileCacheEntry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.forst.fs.cache;
 
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.asyncprocessing.ReferenceCounted;
@@ -29,71 +30,301 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A file cache entry that encapsulates file and the size of the file, and 
provides methods to read
  * or write file. Not thread safe.
  */
-public class FileCacheEntry extends ReferenceCounted {
+public final class FileCacheEntry extends ReferenceCounted<Object> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FileCacheEntry.class);
+    private static final int READ_BUFFER_SIZE = 64 * 1024;
 
-    /** The file system of cache. */
+    /** The file-based cache that this entry belongs to. */
+    final FileBasedCache fileBasedCache;
+
+    /** The file system of the cache. */
     final FileSystem cacheFs;
 
-    /** The original path of file. */
+    /** The original path of the file. */
     final Path originalPath;
 
-    /** The path in cache. */
+    /** The path in the cache. */
     final Path cachePath;
 
-    /** The size of file. */
-    long entrySize;
-
-    volatile boolean closed;
+    /** The size of the file. */
+    final long entrySize;
 
+    /** A queue of opened streams associated with this cache entry. */
     final Queue<CachedDataInputStream> openedStreams;
 
+    /** The current status of the cache entry. */
+    final AtomicReference<EntryStatus> status;
+
+    /** A function to be called when the cache entry is accessed. */
+    private Runnable touchFunction;
+
+    /** The epoch time of the second access. */
+    long secondAccessEpoch = 0L;
+
+    /** The count of times this entry has been promoted. */
+    int accessCountInColdLink = 0;
+
+    /** The count of times this entry has been evicted. */
+    int evictCount = 0;
+
+    /** The status of a file cache entry. */
+    public enum EntryStatus {
+        /** The cache entry is fully loaded and available for use. */
+        LOADED,
+
+        /** The cache entry is in the process of being loaded. */
+        LOADING,
+
+        /** The cache entry is invalid and should not be used. */
+        INVALID,
+
+        /** The cache entry is in the process of being removed. */
+        REMOVING,
+
+        /** The cache entry has been removed and is not available. Can be 
reopeneded for reading. */
+        REMOVED,
+
+        /** The cache entry is closed and no longer available for use 
permanently. */
+        CLOSED
+    }
+
+    /**
+     * Constructs a new FileCacheEntry. This entry is initially in the REMOVED 
state with no
+     * reference.
+     *
+     * @param fileBasedCache the file-based cache that this entry belongs to
+     * @param originalPath the original path of the file
+     * @param cachePath the path in the cache
+     * @param entrySize the size of the file
+     */
     FileCacheEntry(
             FileBasedCache fileBasedCache, Path originalPath, Path cachePath, 
long entrySize) {
-        super(1);
+        super(0);
+        this.fileBasedCache = fileBasedCache;
         this.cacheFs = fileBasedCache.cacheFs;
         this.originalPath = originalPath;
         this.cachePath = cachePath;
         this.entrySize = entrySize;
-        this.closed = false;
         this.openedStreams = new LinkedBlockingQueue<>();
+        this.status = new AtomicReference<>(EntryStatus.REMOVED);
+        LOG.trace("Create new cache entry {}.", cachePath);
     }
 
+    /**
+     * Opens a new {@link CachedDataInputStream} from this cache entry. If the 
cache stream is
+     * available, it will be used; otherwise, the original stream will be 
used. But the cache stream
+     * will be used once available. The opened stream is added to the queue of 
opened streams
+     * associated with this cache entry.
+     *
+     * @param originalStream the original input stream to be used if the cache 
stream is not
+     *     available
+     * @return a new {@link CachedDataInputStream} for reading data
+     * @throws IOException if an I/O error occurs while opening the stream
+     */
     public CachedDataInputStream open(FSDataInputStream originalStream) throws 
IOException {
-        if (!closed && tryRetain() > 0) {
+        LOG.trace("Open new stream for cache entry {}.", cachePath);
+        FSDataInputStream cacheStream = getCacheStream();
+        if (cacheStream != null) {
             CachedDataInputStream inputStream =
-                    new CachedDataInputStream(this, cacheFs.open(cachePath), 
originalStream);
+                    new CachedDataInputStream(fileBasedCache, this, 
cacheStream, originalStream);
             openedStreams.add(inputStream);
             release();
             return inputStream;
         } else {
-            return null;
+            CachedDataInputStream inputStream =
+                    new CachedDataInputStream(fileBasedCache, this, 
originalStream);
+            openedStreams.add(inputStream);
+            return inputStream;
         }
     }
 
-    public void invalidate() {
-        if (!closed) {
-            closed = true;
-            release();
+    /**
+     * Retrieves the cached input stream for this cache entry if it is 
available and the entry is in
+     * a valid state. The method attempts to open the cached stream if the 
entry is in the LOADED
+     * state and retains a reference to it.
+     *
+     * @return the cached input stream if available, otherwise null
+     * @throws IOException if an I/O error occurs while opening the cached 
stream
+     */
+    FSDataInputStream getCacheStream() throws IOException {
+        if (status.get() == EntryStatus.LOADED && tryRetain() > 0) {
+            return cacheFs.open(cachePath);
+        }
+        return null;
+    }
+
+    /**
+     * Sets the touch function associated with this cache entry. The reason 
for setting the touch
+     * function is to update the entry order in {@link FileBasedCache}. The 
touch function is not
+     * initialized in constructor, since the node in LRU should be created 
before the touch function
+     * is available, and this all happens after this entry is built.
+     */
+    void setTouchFunction(Runnable touchFunction) {
+        this.touchFunction = touchFunction;
+    }
+
+    /**
+     * Invokes the touch function associated with this cache entry. This 
method is called to
+     * indicate that the cache entry has been accessed, and as a result, the 
entry order in {@link
+     * FileBasedCache} is expected to be updated.
+     */
+    void touch() {
+        if (touchFunction != null) {
+            touchFunction.run();
+        }
+    }
+
+    /**
+     * Loads the file from the original path to the cache path. This method 
reads the file from the
+     * original path and writes it to the cache path. If the file is 
successfully loaded, the cache
+     * path is returned. If an I/O error occurs during the loading process, 
null is returned.
+     *
+     * @see FileBasedCache#movedToFirst(FileCacheEntry)
+     * @return the cache path if the file is successfully loaded, otherwise 
null.
+     */
+    Path load() {
+        FSDataInputStream inputStream = null;
+        FSDataOutputStream outputStream = null;
+        try {
+            final byte[] buffer = new byte[READ_BUFFER_SIZE];
+
+            inputStream = originalPath.getFileSystem().open(originalPath, 
READ_BUFFER_SIZE);
+
+            outputStream = cacheFs.create(cachePath, 
FileSystem.WriteMode.OVERWRITE);
+
+            long maxTransferBytes =
+                    
originalPath.getFileSystem().getFileStatus(originalPath).getLen();
+
+            while (maxTransferBytes > 0) {
+                int maxReadBytes = (int) Math.min(maxTransferBytes, 
READ_BUFFER_SIZE);
+                int readBytes = inputStream.read(buffer, 0, maxReadBytes);
+
+                if (readBytes == -1) {
+                    break;
+                }
+
+                outputStream.write(buffer, 0, readBytes);
+
+                maxTransferBytes -= readBytes;
+            }
+            return cachePath;
+        } catch (IOException e) {
+            return null;
+        } finally {
+            try {
+                if (inputStream != null) {
+                    inputStream.close();
+                }
+                if (outputStream != null) {
+                    outputStream.close();
+                }
+            } catch (IOException e) {
+                // ignore
+            }
         }
     }
 
+    /**
+     * Only two scenario that the reference count can reach 0. 1. The cache 
entry is invalid the
+     * reference count is released. {@see invalidate()} 2. The cache entry is 
closed and the
+     * reference count is scheduled released. {@see close()}
+     */
     @Override
     protected void referenceCountReachedZero(@Nullable Object o) {
+        if (switchStatus(EntryStatus.INVALID, EntryStatus.REMOVING)
+                || checkStatus(EntryStatus.CLOSED)) {
+            fileBasedCache.removeFile(this);
+        }
+    }
+
+    /**
+     * Removes the cache file associated with this cache entry. This method 
deletes the cache file
+     * and closes all opened streams associated with this cache entry.
+     */
+    synchronized void doRemoveFile() {
         try {
-            for (CachedDataInputStream stream : openedStreams) {
-                stream.close();
+            Iterator<CachedDataInputStream> iterator = 
openedStreams.iterator();
+            while (iterator.hasNext()) {
+                CachedDataInputStream stream = iterator.next();
+                if (stream.isClosed()) {
+                    iterator.remove();
+                } else {
+                    stream.closeCachedStream();
+                }
             }
             cacheFs.delete(cachePath, false);
+            if (status.get() != EntryStatus.CLOSED) {
+                status.set(FileCacheEntry.EntryStatus.REMOVED);
+            }
         } catch (Exception e) {
             LOG.warn("Failed to delete cache entry {}.", cachePath, e);
         }
     }
+
+    // -----------------------------------------------------
+    // Status change methods, invoked by different threads.
+    // -----------------------------------------------------
+
+    synchronized void loaded() {
+        // 0 -> 1
+        if (checkStatus(EntryStatus.LOADED)) {
+            retain();
+        }
+    }
+
+    /**
+     * Invalidate the cache entry if it is LOADED.
+     *
+     * @return true if the cache entry is actually invalidated, false 
otherwise.
+     */
+    synchronized boolean invalidate() {
+        if (switchStatus(EntryStatus.LOADED, EntryStatus.INVALID)) {
+            release();
+            return true;
+        }
+        return false;
+    }
+
+    synchronized void close() {
+        if (getAndSetStatus(EntryStatus.CLOSED) == EntryStatus.LOADED) {
+            release();
+        } else {
+            status.set(EntryStatus.CLOSED);
+        }
+    }
+
+    // ----------------------------
+    // Status related methods
+    // ----------------------------
+
+    boolean switchStatus(EntryStatus from, EntryStatus to) {
+        if (status.compareAndSet(from, to)) {
+            LOG.trace(
+                    "Cache {} (for {}) Switch status from {} to {}.",
+                    originalPath,
+                    cachePath,
+                    from,
+                    to);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    boolean checkStatus(EntryStatus to) {
+        return status.get() == to;
+    }
+
+    EntryStatus getAndSetStatus(EntryStatus to) {
+        return status.getAndSet(to);
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/LruCache.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/LruCache.java
deleted file mode 100644
index a67287ea1a2..00000000000
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/LruCache.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.state.forst.fs.cache;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import java.io.Closeable;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * Uniformed LRU Cache.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-abstract class LruCache<K, V> implements Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(LruCache.class);
-    private final Object lock = new Object();
-
-    @GuardedBy("lock")
-    protected CacheLimitPolicy cacheLimitPolicy;
-
-    @GuardedBy("lock")
-    private final LruHashMap dataMap;
-
-    /** Internal underlying data map. */
-    class LruHashMap extends LinkedHashMap<K, V> {
-
-        private static final int DEFAULT_SIZE = 1024;
-
-        /** Maximum capacity. */
-        private final int capacity;
-
-        LruHashMap(int capacity) {
-            super(DEFAULT_SIZE, 0.75f, true);
-            this.capacity = capacity;
-        }
-
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<K, V> entry) {
-            if (capacity > 0 && size() > capacity) {
-                internalRemove(entry.getValue());
-                cacheLimitPolicy.release(getValueResource(entry.getValue()));
-                return true;
-            }
-            return false;
-        }
-    }
-
-    LruCache(int capacity, CacheLimitPolicy cacheLimitPolicy) {
-        this.cacheLimitPolicy = cacheLimitPolicy;
-        this.dataMap = new LruHashMap(capacity);
-    }
-
-    public boolean put(K key, V value) {
-        synchronized (lock) {
-            if (!cacheLimitPolicy.isSafeToAdd(getValueResource(value))) {
-                return false;
-            }
-            V previous = dataMap.put(key, value);
-            if (previous != null) {
-                internalRemove(previous);
-                cacheLimitPolicy.release(getValueResource(previous));
-            }
-        }
-        internalInsert(key, value);
-        LOG.trace(
-                "Put {},{} into cache, current cacheLimiter {}",
-                key,
-                getValueResource(value),
-                cacheLimitPolicy.toString());
-        tryTrim(getValueResource(value));
-        synchronized (lock) {
-            cacheLimitPolicy.acquire(getValueResource(value));
-        }
-        return true;
-    }
-
-    public V get(K key) {
-        synchronized (lock) {
-            V value = dataMap.get(key);
-            return internalGet(key, value);
-        }
-    }
-
-    public V remove(K key) {
-        synchronized (lock) {
-            V previous = dataMap.remove(key);
-            if (previous != null) {
-                internalRemove(previous);
-                cacheLimitPolicy.release(getValueResource(previous));
-            }
-            return previous;
-        }
-    }
-
-    public int getSize() {
-        synchronized (lock) {
-            return dataMap.size();
-        }
-    }
-
-    /**
-     * Try to evict the old entries in cache until the current occupied 
resource is less than the
-     * resource.
-     */
-    private void tryTrim(long toAddResource) {
-        synchronized (lock) {
-            if (!cacheLimitPolicy.isOverflow(toAddResource)) { // infinite 
resource, no need to trim
-                return;
-            }
-            while (cacheLimitPolicy.isOverflow(toAddResource) && 
!dataMap.isEmpty()) {
-                Map.Entry<K, V> toRemove = 
dataMap.entrySet().iterator().next();
-                LOG.trace("evict {} {}", toRemove.getKey(), toAddResource);
-                dataMap.remove(toRemove.getKey());
-                internalRemove(toRemove.getValue());
-                
cacheLimitPolicy.release(getValueResource(toRemove.getValue()));
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        synchronized (lock) {
-            for (V value : dataMap.values()) {
-                internalRemove(value);
-            }
-            dataMap.clear();
-        }
-    }
-
-    abstract V internalGet(K key, V value);
-
-    abstract void internalInsert(K key, V value);
-
-    abstract void internalRemove(V value);
-
-    abstract long getValueResource(V value);
-}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
index 5d2bfe1dc94..92bcb0bb6fe 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
@@ -93,6 +93,7 @@ import java.util.function.Function;
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
+import static 
org.apache.flink.state.forst.fs.cache.FileBasedCache.setFlinkThread;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -302,6 +303,9 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
                         numberOfKeyGroups);
 
         try {
+            // Current thread (task thread) must be a Flink thread to enable 
proper cache
+            // management.
+            setFlinkThread();
             // Variables for snapshot strategy when incremental checkpoint is 
enabled
             UUID backendUID = UUID.randomUUID();
             SortedMap<Long, Collection<HandleAndLocalPath>> 
materializedSstFiles = new TreeMap<>();
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
index 406604c3497..4cf2b51e01e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.forst.fs;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.ByteBufferReadable;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -54,6 +55,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
+import static 
org.apache.flink.state.forst.ForStOptions.CACHE_LRU_ACCESS_BEFORE_PROMOTION;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ForStFlinkFileSystem}. */
@@ -69,7 +71,7 @@ public class ForStFlinkFileSystemTest {
                     {null},
                     {
                         new FileBasedCache(
-                                1024 * 3,
+                                new Configuration(),
                                 new SizeBasedCacheLimitPolicy(1024 * 3),
                                 FileSystem.getLocalFileSystem(),
                                 new 
org.apache.flink.core.fs.Path(tempDir.toString() + "/cache"),
@@ -251,7 +253,7 @@ public class ForStFlinkFileSystemTest {
                 };
         FileBasedCache cache =
                 new FileBasedCache(
-                        250,
+                        new Configuration(),
                         cacheLimitPolicy,
                         FileSystem.getLocalFileSystem(),
                         cachePath,
@@ -274,15 +276,17 @@ public class ForStFlinkFileSystemTest {
         org.apache.flink.core.fs.Path cachePath1 =
                 new org.apache.flink.core.fs.Path(cachePath, 
sstRealPath1.getName());
         os1.write(tmpBytes);
+        os1.write(76);
         os1.write(89);
         os1.sync();
         os1.close();
         assertThat(fileSystem.exists(sstRemotePath1)).isTrue();
         assertThat(cachePath.getFileSystem().exists(cachePath1)).isTrue();
-        
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(234L);
+        
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L);
         
assertThat(registeredCounters.get("forst.fileCache.hit").getCount()).isEqualTo(0L);
         
assertThat(registeredCounters.get("forst.fileCache.miss").getCount()).isEqualTo(0L);
-        FileCacheEntry cacheEntry1 = cache.get(cachePath.getPath() + "/" + 
sstRealPath1.getName());
+        FileCacheEntry cacheEntry1 =
+                cache.get(cachePath.getPath() + "/" + sstRealPath1.getName(), 
false);
         assertThat(cacheEntry1).isNotNull();
         assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
 
@@ -291,7 +295,14 @@ public class ForStFlinkFileSystemTest {
         assertThat(is.read(tmpBytes)).isEqualTo(233);
         assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
         assertThat(cacheEntry1.getReferenceCount()).isEqualTo(1);
-        
assertThat(registeredCounters.get("forst.fileCache.hit").getCount()).isEqualTo(2L);
+        
assertThat(registeredCounters.get("forst.fileCache.hit").getCount()).isEqualTo(0L);
+        
assertThat(registeredCounters.get("forst.fileCache.miss").getCount()).isEqualTo(0L);
+
+        // set flink thread to enable the metrics
+        FileBasedCache.setFlinkThread();
+        assertThat(is.read()).isEqualTo(76);
+        
assertThat(registeredCounters.get("forst.fileCache.hit").getCount()).isEqualTo(1L);
+
         // evict
         org.apache.flink.core.fs.Path sstRemotePath2 =
                 new org.apache.flink.core.fs.Path(remotePath, "2.sst");
@@ -311,6 +322,23 @@ public class ForStFlinkFileSystemTest {
         
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(233L);
         // read after evict
         assertThat(is.read()).isEqualTo(89);
+
+        // more file to fill the cold link
+        org.apache.flink.core.fs.Path sstRemotePath3 =
+                new org.apache.flink.core.fs.Path(remotePath, "3.sst");
+        ByteBufferWritableFSDataOutputStream os3 = 
fileSystem.create(sstRemotePath3);
+        os3.write(tmpBytes);
+        os3.sync();
+        os3.close();
+
+        // more access to loadback
+        for (int i = 0; i < CACHE_LRU_ACCESS_BEFORE_PROMOTION.defaultValue() - 
1; i++) {
+            
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue())
+                    .isEqualTo(233L);
+            is.seek(0);
+        }
+        
assertThat(registeredGauges.get("forst.fileCache.usedBytes").getValue()).isEqualTo(235L);
+
         is.close();
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/cache/DoubleListLruTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/cache/DoubleListLruTest.java
new file mode 100644
index 00000000000..a0fbe6630e0
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/cache/DoubleListLruTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs.cache;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link DoubleListLru}. */
+class DoubleListLruTest {
+
+    static class TestDoubleListLru<K, V> extends DoubleListLru<K, V> {
+        private final HashMap<String, V> invocation = new HashMap<>();
+        private final Function<V, Boolean> promotionPolicy;
+
+        public TestDoubleListLru(Function<V, Boolean> promotionPolicy) {
+            super();
+            resetInvocations();
+            this.promotionPolicy = promotionPolicy;
+        }
+
+        private boolean nothingCalled() {
+            return invocation.isEmpty();
+        }
+
+        private void resetInvocations() {
+            invocation.clear();
+        }
+
+        public V getInvocation(String methodName) {
+            return invocation.getOrDefault(methodName, null);
+        }
+
+        @Override
+        boolean isSafeToAddFirst(V value) {
+            return true;
+        }
+
+        @Override
+        void newNodeCreated(V value, DoubleListLru<K, V>.Node n) {
+            invocation.put("newNodeCreated", value);
+        }
+
+        @Override
+        void addedToFirst(V value) {
+            invocation.put("addedToFirst", value);
+        }
+
+        @Override
+        void addedToSecond(V value) {
+            invocation.put("addedToSecond", value);
+        }
+
+        @Override
+        void removedFromFirst(V value) {
+            invocation.put("removedFromFirst", value);
+        }
+
+        @Override
+        void removedFromSecond(V value) {
+            invocation.put("removedFromSecond", value);
+        }
+
+        @Override
+        void movedToFirst(V value) {
+            invocation.put("movedToFirst", value);
+        }
+
+        @Override
+        void movedToSecond(V value) {
+            invocation.put("movedToSecond", value);
+        }
+
+        @Override
+        boolean nodeAccessedAtSecond(V value) {
+            invocation.put("nodeAccessedAtSecond", value);
+            return promotionPolicy.apply(value);
+        }
+
+        @Override
+        void promotedToFirst(V value) {
+            invocation.put("promotedToFirst", value);
+        }
+    }
+
+    private TestDoubleListLru<String, Integer> cache;
+
+    @BeforeEach
+    void setUp() {
+        cache = new TestDoubleListLru<>(e -> e >= 4);
+    }
+
+    @Test
+    void testAddFirst() {
+        cache.addFirst("one", 1);
+        assertEquals(1, cache.size());
+        assertEquals(1, cache.get("one", false));
+        assertNull(cache.getMiddle());
+        assertEquals(1, cache.getInvocation("addedToFirst"));
+    }
+
+    @Test
+    void testAddSecond() {
+        cache.addFirst("one", 1);
+        cache.addSecond("two", 2);
+        assertEquals(2, cache.size());
+        assertEquals(2, cache.getMiddle());
+        assertEquals(2, cache.getInvocation("addedToSecond"));
+    }
+
+    @Test
+    void testMoveMiddle() {
+        cache.addFirst("one", 1);
+        cache.addSecond("two", 2);
+        cache.moveMiddleBack();
+        assertNull(cache.getMiddle());
+        assertEquals(2, cache.getInvocation("movedToFirst"));
+        cache.moveMiddleFront();
+        assertEquals(2, cache.getMiddle());
+        assertEquals(2, cache.getInvocation("movedToSecond"));
+    }
+
+    @Test
+    void testRemove() {
+        cache.addFirst("one", 1);
+        cache.addFirst("two", 2);
+        cache.remove("two");
+        assertEquals(1, cache.size());
+        assertNull(cache.get("two", false));
+        assertEquals(2, cache.getInvocation("removedFromFirst"));
+    }
+
+    @Test
+    void testGet() {
+        cache.addFirst("one", 1);
+        cache.addFirst("two", 2);
+        cache.addSecond("three", 3);
+        cache.addSecond("four", 4);
+        cache.resetInvocations();
+        cache.get("one", true);
+        assertTrue(cache.nothingCalled());
+        cache.get("three", true);
+        assertEquals(3, cache.getInvocation("nodeAccessedAtSecond"));
+        assertNull(cache.getInvocation("promotedToFirst"));
+
+        cache.resetInvocations();
+        cache.get("four", false);
+        assertTrue(cache.nothingCalled());
+
+        cache.get("four", true);
+        assertEquals(4, cache.getInvocation("nodeAccessedAtSecond"));
+        assertEquals(4, cache.getInvocation("promotedToFirst"));
+        assertEquals(4, cache.getInvocation("movedToFirst"));
+    }
+}


Reply via email to