This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9878e561af3 [FLINK-36935][state/forst] Implement fast link for 
ForStFileSystem (#25818)
9878e561af3 is described below

commit 9878e561af30a90dd5491e9df9c99c8c17734a82
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Mon Jan 6 21:00:25 2025 +0800

    [FLINK-36935][state/forst] Implement fast link for ForStFileSystem (#25818)
---
 .../flink/state/forst/ForStResourceContainer.java  |   5 +-
 .../flink/state/forst/fs/ForStFlinkFileSystem.java | 216 ++++++------------
 .../forst/fs/filemapping/FileMappingManager.java   | 251 +++++++++++++++++++++
 .../state/forst/fs/filemapping/MappingEntry.java   |  87 +++++++
 .../restore/ForStIncrementalRestoreOperation.java  |  22 +-
 .../state/forst/fs/FileMappingManagerTest.java     | 238 +++++++++++++++++++
 .../state/forst/fs/ForStFlinkFileSystemTest.java   |  15 +-
 7 files changed, 674 insertions(+), 160 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index 53139ca07fa..08ee1a7f7fa 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -396,8 +396,9 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         }
     }
 
-    private static void clearDirectories(Path basePath) throws IOException {
-        FileSystem fileSystem = basePath.getFileSystem();
+    private void clearDirectories(Path basePath) throws IOException {
+        FileSystem fileSystem =
+                forStFileSystem != null ? forStFileSystem : 
basePath.getFileSystem();
         if (fileSystem.exists(basePath)) {
             fileSystem.delete(basePath, true);
         }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index e760d07515e..433f5f15c27 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -19,7 +19,6 @@
 package org.apache.flink.state.forst.fs;
 
 import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -33,6 +32,7 @@ import 
org.apache.flink.state.forst.fs.cache.CachedDataOutputStream;
 import org.apache.flink.state.forst.fs.cache.FileBasedCache;
 import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy;
 import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
+import org.apache.flink.state.forst.fs.filemapping.FileMappingManager;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -40,7 +40,8 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.util.function.Function;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -58,14 +59,10 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     private static final long SST_FILE_SIZE = 1024 * 1024 * 64;
 
-    private static final Function<String, Boolean> miscFileFilter = s -> 
!s.endsWith(".sst");
-
     private final FileSystem localFS;
     private final FileSystem delegateFS;
-    private final String remoteBase;
-    private final Function<String, Boolean> localFileFilter;
-    private final String localBase;
     @Nullable private final FileBasedCache fileBasedCache;
+    private final FileMappingManager fileMappingManager;
 
     public ForStFlinkFileSystem(
             FileSystem delegateFS,
@@ -74,10 +71,9 @@ public class ForStFlinkFileSystem extends FileSystem {
             @Nullable FileBasedCache fileBasedCache) {
         this.localFS = FileSystem.getLocalFileSystem();
         this.delegateFS = delegateFS;
-        this.localFileFilter = miscFileFilter;
-        this.remoteBase = remoteBase;
-        this.localBase = localBase;
         this.fileBasedCache = fileBasedCache;
+        this.fileMappingManager =
+                new FileMappingManager(delegateFS, localFS, remoteBase, 
localBase);
     }
 
     /**
@@ -142,10 +138,10 @@ public class ForStFlinkFileSystem extends FileSystem {
     @Override
     public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode 
overwriteMode)
             throws IOException {
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
+        FileMappingManager.RealPath realPath = 
fileMappingManager.createFile(path);
+        if (realPath.isLocal) {
             return new ByteBufferWritableFSDataOutputStream(
-                    localFS.create(localPathTuple.f1, overwriteMode));
+                    localFS.create(realPath.path, overwriteMode));
         }
 
         FSDataOutputStream originalOutputStream = delegateFS.create(path, 
overwriteMode);
@@ -157,19 +153,22 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     @Override
     public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) 
throws IOException {
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
+        FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
+        Preconditions.checkNotNull(realPath);
+        if (realPath.isLocal) {
             return new ByteBufferReadableFSDataInputStream(
-                    () -> localFS.open(localPathTuple.f1, bufferSize),
+                    () -> localFS.open(realPath.path, bufferSize),
                     DEFAULT_INPUT_STREAM_CAPACITY,
-                    localFS.getFileStatus(localPathTuple.f1).getLen());
+                    localFS.getFileStatus(realPath.path).getLen());
         }
-        FileStatus fileStatus = checkNotNull(getFileStatus(path));
+        FileStatus fileStatus = checkNotNull(getFileStatus(realPath.path));
         return new ByteBufferReadableFSDataInputStream(
                 () -> {
-                    FSDataInputStream inputStream = delegateFS.open(path, 
bufferSize);
+                    FSDataInputStream inputStream = 
delegateFS.open(realPath.path, bufferSize);
                     CachedDataInputStream cachedDataInputStream =
-                            fileBasedCache == null ? null : 
fileBasedCache.open(path, inputStream);
+                            fileBasedCache == null
+                                    ? null
+                                    : fileBasedCache.open(realPath.path, 
inputStream);
                     return cachedDataInputStream == null ? inputStream : 
cachedDataInputStream;
                 },
                 DEFAULT_INPUT_STREAM_CAPACITY,
@@ -178,19 +177,22 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     @Override
     public ByteBufferReadableFSDataInputStream open(Path path) throws 
IOException {
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
+        FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
+        Preconditions.checkNotNull(realPath);
+        if (realPath.isLocal) {
             return new ByteBufferReadableFSDataInputStream(
-                    () -> localFS.open(localPathTuple.f1),
+                    () -> localFS.open(realPath.path),
                     DEFAULT_INPUT_STREAM_CAPACITY,
-                    localFS.getFileStatus(localPathTuple.f1).getLen());
+                    localFS.getFileStatus(realPath.path).getLen());
         }
-        FileStatus fileStatus = checkNotNull(getFileStatus(path));
+        FileStatus fileStatus = checkNotNull(getFileStatus(realPath.path));
         return new ByteBufferReadableFSDataInputStream(
                 () -> {
-                    FSDataInputStream inputStream = delegateFS.open(path);
+                    FSDataInputStream inputStream = 
delegateFS.open(realPath.path);
                     CachedDataInputStream cachedDataInputStream =
-                            fileBasedCache == null ? null : 
fileBasedCache.open(path, inputStream);
+                            fileBasedCache == null
+                                    ? null
+                                    : fileBasedCache.open(realPath.path, 
inputStream);
                     return cachedDataInputStream == null ? inputStream : 
cachedDataInputStream;
                 },
                 DEFAULT_INPUT_STREAM_CAPACITY,
@@ -199,28 +201,7 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     @Override
     public boolean rename(Path src, Path dst) throws IOException {
-        // The rename is not atomic for ForSt. Some FileSystems e.g. HDFS, OSS 
does not allow a
-        // renaming if the target already exists. So, we delete the target 
before attempting the
-        // rename.
-
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(src);
-        if (localPathTuple.f0) {
-            Path localSrc = localPathTuple.f1;
-            Path localDst = tryBuildLocalPath(dst).f1;
-            FileStatus fileStatus = localFS.getFileStatus(localSrc);
-            boolean success = localFS.rename(localSrc, localDst);
-            if (!fileStatus.isDir()) {
-                return success;
-            }
-        }
-
-        if (delegateFS.exists(dst)) {
-            boolean deleted = delegateFS.delete(dst, false);
-            if (!deleted) {
-                throw new IOException("Fail to delete dst path: " + dst);
-            }
-        }
-        return delegateFS.rename(src, dst);
+        return fileMappingManager.renameFile(src.toString(), dst.toString());
     }
 
     @Override
@@ -240,29 +221,41 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     @Override
     public boolean exists(final Path f) throws IOException {
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f);
-        if (localPathTuple.f0) {
-            return localFS.exists(localPathTuple.f1);
+        FileMappingManager.RealPath realPath = fileMappingManager.realPath(f);
+        if (realPath == null) {
+            return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir();
         }
-        return delegateFS.exists(f);
+
+        boolean status = false;
+        if (realPath.isLocal) {
+            status |= localFS.exists(realPath.path);
+            if (!status) {
+                status = delegateFS.exists(f);
+            }
+        } else {
+            status = delegateFS.exists(realPath.path);
+        }
+        return status;
     }
 
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            return localFS.getFileStatus(localPathTuple.f1);
+        FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
+        Preconditions.checkNotNull(realPath);
+        if (realPath.isLocal) {
+            return localFS.getFileStatus(realPath.path);
         }
-        return delegateFS.getFileStatus(path);
+        return delegateFS.getFileStatus(realPath.path);
     }
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len)
             throws IOException {
         Path path = file.getPath();
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            FileStatus localFile = localFS.getFileStatus(localPathTuple.f1);
+        FileMappingManager.RealPath realPath = 
fileMappingManager.realPath(path);
+        Preconditions.checkNotNull(realPath);
+        if (realPath.isLocal) {
+            FileStatus localFile = localFS.getFileStatus(realPath.path);
             return localFS.getFileBlockLocations(localFile, start, len);
         }
         return delegateFS.getFileBlockLocations(file, start, len);
@@ -270,78 +263,28 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
-        FileStatus[] localFiles = new FileStatus[0];
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            localFiles = localFS.listStatus(localPathTuple.f1);
+        // mapping files
+        List<FileStatus> fileStatuses = new ArrayList<>();
+        String pathStr = path.toString();
+        if (!pathStr.endsWith("/")) {
+            pathStr += "/";
         }
-        int localFileNum = localFiles == null ? 0 : localFiles.length;
-        FileStatus[] remoteFiles = delegateFS.listStatus(path);
-        if (localFileNum == 0) {
-            return remoteFiles;
-        }
-        int remoteFileNum = remoteFiles == null ? 0 : remoteFiles.length;
-        FileStatus[] fileStatuses = new FileStatus[localFileNum + 
remoteFileNum];
-        for (int index = 0; index < localFileNum; index++) {
-            final FileStatus localFile = localFiles[index];
-            fileStatuses[index] =
-                    new FileStatus() {
-                        @Override
-                        public long getLen() {
-                            return localFile.getLen();
-                        }
-
-                        @Override
-                        public long getBlockSize() {
-                            return localFile.getBlockSize();
-                        }
-
-                        @Override
-                        public short getReplication() {
-                            return localFile.getReplication();
-                        }
-
-                        @Override
-                        public long getModificationTime() {
-                            return localFile.getModificationTime();
-                        }
-
-                        @Override
-                        public long getAccessTime() {
-                            return localFile.getAccessTime();
-                        }
-
-                        @Override
-                        public boolean isDir() {
-                            return localFile.isDir();
-                        }
-
-                        @Override
-                        public Path getPath() {
-                            if (localFile.getPath().toString().length() == 
localBase.length()) {
-                                return new Path(remoteBase);
-                            }
-                            return new Path(
-                                    remoteBase,
-                                    
localFile.getPath().toString().substring(localBase.length()));
-                        }
-                    };
-        }
-        if (remoteFileNum != 0) {
-            System.arraycopy(remoteFiles, 0, fileStatuses, localFileNum, 
remoteFileNum);
+        List<String> mappingFiles = fileMappingManager.listByPrefix(pathStr);
+        for (String mappingFile : mappingFiles) {
+            String relativePath = mappingFile.substring(pathStr.length());
+            int slashIndex = relativePath.indexOf('/');
+            if (slashIndex == -1) { // direct child
+                fileStatuses.add(getFileStatus(new Path(mappingFile)));
+            }
         }
-        return fileStatuses;
+        return fileStatuses.toArray(new FileStatus[0]);
     }
 
     @Override
     public boolean delete(Path path, boolean recursive) throws IOException {
-        boolean success = false;
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            success = localFS.delete(localPathTuple.f1, recursive); // delete 
from local
-        }
-        success |= delegateFS.delete(path, recursive); // and delete from 
remote
+        boolean success = fileMappingManager.deleteFile(path, recursive);
         if (fileBasedCache != null) {
+            // only new generated file will put into cache, no need to 
consider file mapping
             fileBasedCache.delete(path);
         }
         return success;
@@ -349,13 +292,7 @@ public class ForStFlinkFileSystem extends FileSystem {
 
     @Override
     public boolean mkdirs(Path path) throws IOException {
-        boolean success = true;
-        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
-        if (localPathTuple.f0) {
-            success &= localFS.mkdirs(localPathTuple.f1);
-        }
-        success &= delegateFS.mkdirs(path);
-        return success;
+        return delegateFS.mkdirs(path);
     }
 
     @Override
@@ -363,20 +300,7 @@ public class ForStFlinkFileSystem extends FileSystem {
         return delegateFS.isDistributedFS();
     }
 
-    private Tuple2<Boolean, Path> tryBuildLocalPath(Path path) {
-        String remotePathStr = path.toString();
-        if (localFileFilter.apply(path.getName()) && 
remotePathStr.startsWith(remoteBase)) {
-            return Tuple2.of(
-                    true,
-                    remotePathStr.length() == remoteBase.length()
-                            ? new Path(localBase)
-                            : new Path(localBase, 
remotePathStr.substring(remoteBase.length())));
-        }
-        return Tuple2.of(false, null);
-    }
-
     public int link(Path src, Path dst) throws IOException {
-        // let forstdb copy the file
-        return -1;
+        return fileMappingManager.link(src.toString(), dst.toString());
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
new file mode 100644
index 00000000000..d935a5e4d08
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs.filemapping;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A manager to manage file mapping of forst file system, including misc file 
mapping (remote file
+ * -> local file) and linked mapping (remote file -> remote file). Note, the 
key/value of mapping
+ * table must be a file path, directories are maintained by file system 
itself, directories wouldn't
+ * be the key/value of mapping table.
+ */
+public class FileMappingManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMappingManager.class);
+
+    public static final String SST_SUFFIX = ".sst";
+
+    private final FileSystem fileSystem;
+
+    private final FileSystem localFileSystem;
+
+    private final HashMap<String, MappingEntry> mappingTable;
+
+    private final String remoteBase;
+
+    private final String localBase;
+
+    public FileMappingManager(
+            FileSystem fileSystem,
+            FileSystem localFileSystem,
+            String remoteBase,
+            String localBase) {
+        this.fileSystem = fileSystem;
+        this.localFileSystem = localFileSystem;
+        this.mappingTable = new HashMap<>();
+        this.remoteBase = remoteBase;
+        this.localBase = localBase;
+    }
+
+    /** Create a mapping entry for a file. */
+    public RealPath createFile(Path file) {
+        String fileName = file.toString();
+        Preconditions.checkState(!mappingTable.containsKey(fileName));
+        if (!fileName.endsWith(SST_SUFFIX) && fileName.startsWith(remoteBase)) 
{
+            Path localFile = new Path(localBase, file.getName());
+            mappingTable.put(
+                    fileName,
+                    new MappingEntry(1, localFileSystem, localFile.toString(), 
true, false));
+            return new RealPath(localFile, true);
+        } else {
+            mappingTable.put(fileName, new MappingEntry(1, fileSystem, 
fileName, false, false));
+            return new RealPath(file, false);
+        }
+    }
+
+    /** Called by link/copy. Directory link is not supported now. */
+    public int link(String src, String dst) {
+        if (src.equals(dst)) {
+            return -1;
+        }
+        // if dst already exist, not allow
+        if (mappingTable.containsKey(dst)) {
+            return -1;
+        }
+        MappingEntry sourceEntry = mappingTable.get(src);
+        if (sourceEntry != null) {
+            sourceEntry.retain();
+            mappingTable.putIfAbsent(dst, sourceEntry);
+        } else {
+            sourceEntry = new MappingEntry(0, fileSystem, src, false, false);
+            sourceEntry.retain();
+            mappingTable.put(src, sourceEntry);
+            sourceEntry.retain();
+            mappingTable.put(dst, sourceEntry);
+        }
+        LOG.trace("link: {} -> {}", dst, src);
+        return 0;
+    }
+
+    /**
+     * Get the real path of a file, the real path maybe a local file or a 
remote file/dir. Due to
+     * the lazy deletion, if the path is a directory, the exists check may 
have false positives.
+     */
+    public RealPath realPath(Path path) {
+        String fileName = path.toString();
+        MappingEntry entry = mappingTable.getOrDefault(fileName, null);
+        if (entry != null) {
+            return new RealPath(new Path(entry.sourcePath), entry.isLocal);
+        }
+        return null;
+    }
+
+    public List<String> listByPrefix(String path) {
+        List<String> linkedPaths = new ArrayList<>();
+        for (Map.Entry<String, MappingEntry> entry : mappingTable.entrySet()) {
+            if (isParentDir(entry.getKey(), path)) {
+                linkedPaths.add(entry.getKey());
+            }
+        }
+        return linkedPaths;
+    }
+
+    /**
+     * 1. If src can match any key, we only `mark rename`, no physical file 
would be renamed. 2. If
+     * src is a directory, all files under src will be renamed, including 
linked files and local
+     * files, the directory also would be renamed in file system physically.
+     *
+     * @param src the source path
+     * @param dst the destination path
+     * @return always return true except for IOException
+     */
+    public boolean renameFile(String src, String dst) throws IOException {
+        MappingEntry srcEntry = mappingTable.get(src);
+        if (srcEntry != null) { // rename file
+            if (mappingTable.containsKey(dst)) {
+                MappingEntry dstEntry = mappingTable.remove(dst);
+                dstEntry.release();
+            }
+            mappingTable.remove(src);
+            mappingTable.put(dst, srcEntry);
+        } else { // rename directory = link to dst dir + delete src dir
+
+            // step 1: link all files under src to dst
+            List<String> toRename = listByPrefix(src);
+            for (String key : toRename) {
+                MappingEntry sourceEntry = mappingTable.get(key);
+                sourceEntry.retain();
+                String renamedDst = key.replace(src, dst);
+                LOG.trace("rename: {} -> {}", key, renamedDst);
+                mappingTable.put(renamedDst, sourceEntry);
+            }
+
+            Path dstPath = new Path(dst);
+            if (!fileSystem.exists(dstPath)) {
+                fileSystem.mkdirs(dstPath);
+            }
+            // step 2: delete src dir
+            deleteFile(new Path(src), true);
+        }
+        return true;
+    }
+
+    /**
+     * Delete a file or directory from mapping table and file system, the 
directory deletion may be
+     * deferred.
+     *
+     * @param file to be deleted
+     * @param recursive whether to delete recursively
+     * @return true if the file or directory is deleted successfully, false 
otherwise.
+     * @throws IOException if an error occurs during deletion
+     */
+    public boolean deleteFile(Path file, boolean recursive) throws IOException 
{
+        String fileStr = file.toString();
+        MappingEntry entry = mappingTable.getOrDefault(fileStr, null);
+        LOG.trace("delete: {}, source:{}", file, entry == null ? "null" : 
entry.sourcePath);
+        // case 1: delete file
+        if (entry != null) {
+            mappingTable.remove(fileStr);
+            entry.release();
+            return true;
+        }
+
+        // case 2: delete directory
+        if (!recursive) {
+            throw new IOException(fileStr + "is a directory, delete failed.");
+        }
+        MappingEntry parentEntry = new MappingEntry(0, fileSystem, fileStr, 
false, recursive);
+
+        // step 2.1: find all matched entries, mark delete dir as parent dir
+        for (Map.Entry<String, MappingEntry> currentEntry : 
mappingTable.entrySet()) {
+            if (!isParentDir(currentEntry.getValue().sourcePath, fileStr)) {
+                continue;
+            }
+            MappingEntry oldParentDir = currentEntry.getValue().parentDir;
+            if (oldParentDir == null
+                    || isParentDir(oldParentDir.sourcePath, fileStr)
+                            && !oldParentDir.equals(parentEntry)) {
+                parentEntry.retain();
+                currentEntry.getValue().parentDir = parentEntry;
+            }
+        }
+
+        boolean status = true;
+        // step 2.2: release file under directory
+        if (parentEntry.getReferenceCount() == 0) {
+            // an empty directory
+            status = fileSystem.delete(file, recursive);
+        }
+        List<String> toRelease = listByPrefix(fileStr);
+        for (String key : toRelease) {
+            mappingTable.remove(key).release();
+        }
+        return status;
+    }
+
+    @VisibleForTesting
+    public MappingEntry mappingEntry(String path) {
+        return mappingTable.getOrDefault(path, null);
+    }
+
+    private boolean isParentDir(String path, String dir) {
+        if (dir.isEmpty()) {
+            return false;
+        }
+        if (dir.charAt(dir.length() - 1) == '/') {
+            return path.startsWith(dir);
+        } else {
+            return (path.startsWith(dir + "/"));
+        }
+    }
+
+    /** A wrapper of real path. */
+    public static class RealPath {
+        public final Path path;
+        public final boolean isLocal;
+
+        public RealPath(Path path, boolean isLocal) {
+            this.path = path;
+            this.isLocal = isLocal;
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java
new file mode 100644
index 00000000000..73498e06c51
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/MappingEntry.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs.filemapping;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.asyncprocessing.ReferenceCounted;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A file mapping entry that encapsulates source and destination path. Source 
Path : dest Path = 1 :
+ * N.
+ */
+public class MappingEntry extends ReferenceCounted {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MappingEntry.class);
+
+    /** The reference of file mapping manager. */
+    private final FileSystem fileSystem;
+
+    /** The original path of file. */
+    String sourcePath;
+
+    /** Whether the file is local. */
+    boolean isLocal;
+
+    boolean recursive;
+
+    /** When delete a directory, if the directory is the parent of this source 
file, track it. */
+    @Nullable MappingEntry parentDir;
+
+    public MappingEntry(
+            int initReference,
+            FileSystem fileSystem,
+            String sourcePath,
+            boolean isLocal,
+            boolean recursive) {
+        super(initReference);
+        this.fileSystem = fileSystem;
+        this.sourcePath = sourcePath;
+        this.parentDir = null;
+        this.isLocal = isLocal;
+        this.recursive = recursive;
+    }
+
+    @Override
+    protected void referenceCountReachedZero(@Nullable Object o) {
+        try {
+            if (parentDir != null) {
+                parentDir.release();
+            }
+            fileSystem.delete(new Path(sourcePath), recursive);
+        } catch (Exception e) {
+            LOG.warn("Failed to delete file {}.", sourcePath, e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        return sourcePath.equals(((MappingEntry) o).sourcePath);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
index 4bfee85de65..fc9a1051df1 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
@@ -243,7 +243,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
                     .forEach(
                             dir -> {
                                 try {
-                                    FileSystem fs = dir.getFileSystem();
+                                    FileSystem fs = getFileSystem(dir);
                                     fs.delete(dir, true);
                                 } catch (IOException ignored) {
                                     logger.warn("Failed to delete transfer 
destination {}", dir);
@@ -253,11 +253,7 @@ public class ForStIncrementalRestoreOperation<K> 
implements ForStRestoreOperatio
     }
 
     private void transferAllStateHandles(List<StateHandleTransferSpec> specs) 
throws Exception {
-        FileSystem forStFs = optionsContainer.getFileSystem();
-        if (forStFs == null) {
-            forStFs = FileSystem.getLocalFileSystem();
-        }
-
+        FileSystem forStFs = getFileSystem(optionsContainer.getBasePath());
         try (ForStStateDataTransfer transfer =
                 new 
ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs)) {
             transfer.transferAllStateDataToDirectory(specs, 
cancelStreamRegistry);
@@ -558,7 +554,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
 
     private void cleanUpPathQuietly(@Nonnull Path path) {
         try {
-            path.getFileSystem().delete(path, true);
+            getFileSystem(forstBasePath).delete(path, true);
         } catch (IOException ex) {
             logger.warn("Failed to clean up path " + path, ex);
         }
@@ -664,7 +660,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
             throws Exception {
 
         Path exportCfBasePath = new Path(forstBasePath, "export-cfs");
-        forstBasePath.getFileSystem().mkdirs(exportCfBasePath);
+        getFileSystem(forstBasePath).mkdirs(exportCfBasePath);
 
         final Map<RegisteredStateMetaInfoBase.Key, 
List<ExportImportFilesMetaData>>
                 exportedColumnFamilyMetaData = new 
HashMap<>(keyedStateHandles.size());
@@ -750,7 +746,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
                         
checkpoint.exportColumnFamily(columnFamilyHandles.get(i), subPathStr);
 
                 FileStatus[] exportedSstFiles =
-                        exportBasePath.getFileSystem().listStatus(new 
Path(exportBasePath, uuid));
+                        getFileSystem(exportBasePath).listStatus(new 
Path(exportBasePath, uuid));
 
                 if (exportedSstFiles != null) {
                     int sstFileCount = 0;
@@ -967,6 +963,14 @@ public class ForStIncrementalRestoreOperation<K> 
implements ForStRestoreOperatio
                 stateHandleSpec.getTransferDestination().toString());
     }
 
+    private FileSystem getFileSystem(Path path) throws IOException {
+        if (optionsContainer.getFileSystem() != null) {
+            return optionsContainer.getFileSystem();
+        } else {
+            return path.getFileSystem();
+        }
+    }
+
     /** Entity to hold the temporary RocksDB instance created for restore. */
     public static class RestoredDBInstance implements AutoCloseable {
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
new file mode 100644
index 00000000000..081e12d690c
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.state.forst.fs.filemapping.FileMappingManager;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link FileMappingManager}. */
+public class FileMappingManagerTest {
+    @TempDir static java.nio.file.Path tempDir;
+
+    @Test
+    void testFileLink() throws IOException {
+        FileSystem localFS = FileSystem.getLocalFileSystem();
+        FileMappingManager fileMappingManager =
+                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+        String src = tempDir + "/source";
+        FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
+        os.write(233);
+        os.close();
+        String dst = tempDir.toString() + "/dst";
+        fileMappingManager.link(src, dst);
+        assertThat(fileMappingManager.realPath(new 
Path(dst)).path.toString()).isEqualTo(src);
+    }
+
+    @Test
+    void testNestLink() throws IOException {
+        // link b->a
+        // link c->b
+        // link d->c
+        FileSystem localFS = FileSystem.getLocalFileSystem();
+        FileMappingManager fileMappingManager =
+                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+        String src = tempDir + "/a";
+        FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
+        os.write(233);
+        os.close();
+        String dstB = tempDir.toString() + "/b";
+        fileMappingManager.link(src, dstB);
+        assertThat(fileMappingManager.realPath(new 
Path(dstB)).path.toString()).isEqualTo(src);
+        
assertThat(fileMappingManager.mappingEntry(dstB).getReferenceCount()).isEqualTo(2);
+
+        String dstC = tempDir.toString() + "/c";
+        fileMappingManager.link(dstB, dstC);
+        assertThat(fileMappingManager.realPath(new 
Path(dstC)).path.toString()).isEqualTo(src);
+        
assertThat(fileMappingManager.mappingEntry(dstC).getReferenceCount()).isEqualTo(3);
+
+        String dstD = tempDir.toString() + "/d";
+        fileMappingManager.link(dstC, dstD);
+        assertThat(fileMappingManager.realPath(new 
Path(dstD)).path.toString()).isEqualTo(src);
+        
assertThat(fileMappingManager.mappingEntry(dstC).getReferenceCount()).isEqualTo(4);
+
+        assertThat(fileMappingManager.link(dstD, dstC)).isEqualTo(-1);
+    }
+
+    @Test
+    void testFileDelete() throws IOException {
+        FileSystem localFS = FileSystem.getLocalFileSystem();
+        FileMappingManager fileMappingManager =
+                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+        String src = tempDir + "/source";
+        FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
+        os.write(233);
+        os.close();
+        String dst = tempDir.toString() + "/dst";
+        fileMappingManager.link(src, dst);
+        // delete src
+        fileMappingManager.deleteFile(new Path(src), false);
+        assertThat(localFS.exists(new Path(src))).isTrue();
+
+        // delete dst
+        fileMappingManager.deleteFile(new Path(dst), false);
+        assertThat(localFS.exists(new Path(src))).isFalse();
+    }
+
+    @Test
+    void testDirectoryDelete() throws IOException {
+        FileSystem localFS = FileSystem.getLocalFileSystem();
+        FileMappingManager fileMappingManager =
+                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+        String testDir = tempDir + "/testDir";
+        localFS.mkdirs(new Path(testDir));
+        String src = testDir + "/source";
+        FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
+        os.write(233);
+        os.close();
+        String dst = tempDir.toString() + "/dst";
+        fileMappingManager.link(src, dst);
+
+        // delete testDir
+        fileMappingManager.deleteFile(new Path(testDir), true);
+        assertThat(localFS.exists(new Path(src))).isTrue();
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
+
+        // delete dst
+        fileMappingManager.deleteFile(new Path(dst), false);
+        assertThat(localFS.exists(new Path(src))).isFalse();
+        assertThat(localFS.exists(new Path(testDir))).isFalse();
+    }
+
+    @Test
+    void testDirectoryRename() throws IOException {
+        FileSystem localFS = FileSystem.getLocalFileSystem();
+        FileMappingManager fileMappingManager =
+                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+        String testDir = tempDir + "/testDir";
+        localFS.mkdirs(new Path(testDir));
+        String src = testDir + "/source";
+        FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
+        os.write(233);
+        os.close();
+
+        String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp";
+        localFS.mkdirs(new Path(linkedDirTmp));
+        String linkedSrc = linkedDirTmp + "/source";
+        fileMappingManager.link(src, linkedSrc);
+
+        String linkedDir = tempDir.toString() + "/linkedDir";
+        // rename linkDir.tmp to linkedDir
+        assertThat(fileMappingManager.renameFile(linkedDirTmp, 
linkedDir)).isEqualTo(true);
+        linkedSrc = linkedDir + "/source";
+
+        // delete src
+        assertThat(fileMappingManager.deleteFile(new Path(src), 
false)).isEqualTo(true);
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse();
+        assertThat(localFS.exists(new Path(linkedDir))).isTrue();
+        assertThat(localFS.exists(new Path(src))).isTrue();
+
+        // delete testDir
+        fileMappingManager.deleteFile(new Path(testDir), true);
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDir))).isTrue();
+        assertThat(localFS.exists(new Path(src))).isTrue();
+
+        // delete linkedSrc
+        assertThat(fileMappingManager.deleteFile(new Path(linkedSrc), 
false)).isEqualTo(true);
+        assertThat(localFS.exists(new Path(src))).isFalse();
+        assertThat(localFS.exists(new Path(testDir))).isFalse();
+
+        // delete linkedDir
+        assertThat(fileMappingManager.deleteFile(new Path(linkedDir), 
true)).isEqualTo(true);
+        assertThat(localFS.exists(new Path(testDir))).isFalse();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse();
+        assertThat(localFS.exists(new Path(linkedDir))).isFalse();
+        assertThat(localFS.exists(new Path(src))).isFalse();
+    }
+
+    @Test
+    void testCreateFileBeforeRename() throws IOException {
+        FileSystem localFS = FileSystem.getLocalFileSystem();
+        FileMappingManager fileMappingManager =
+                new FileMappingManager(localFS, localFS, tempDir.toString(), 
tempDir.toString());
+        String testDir = tempDir + "/testDir";
+        localFS.mkdirs(new Path(testDir));
+        String src = testDir + "/source";
+        FSDataOutputStream os = localFS.create(new Path(src), 
FileSystem.WriteMode.OVERWRITE);
+        os.write(233);
+        os.close();
+
+        String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp";
+        localFS.mkdirs(new Path(linkedDirTmp));
+        String linkedSrc = linkedDirTmp + "/source";
+
+        // link src to linkedDirTmp
+        fileMappingManager.link(src, linkedSrc);
+
+        // create file in linkedDirTmp
+        String create = linkedDirTmp + "/create.sst";
+        FileMappingManager.RealPath realPath = 
fileMappingManager.createFile(new Path(create));
+        FSDataOutputStream os1 = localFS.create(realPath.path, 
FileSystem.WriteMode.OVERWRITE);
+        os1.write(233);
+        os1.close();
+
+        String linkedDir = tempDir.toString() + "/linkedDir";
+        // rename linkDir.tmp to linkedDir
+        assertThat(fileMappingManager.renameFile(linkedDirTmp, 
linkedDir)).isEqualTo(true);
+        linkedSrc = linkedDir + "/source";
+
+        // delete src
+        assertThat(fileMappingManager.deleteFile(new Path(src), 
false)).isEqualTo(true);
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDir))).isTrue();
+        assertThat(localFS.exists(new Path(src))).isTrue();
+
+        // delete testDir
+        fileMappingManager.deleteFile(new Path(testDir), true);
+        assertThat(localFS.exists(new Path(testDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
+        assertThat(localFS.exists(new Path(src))).isTrue();
+
+        // delete linkedSrc
+        assertThat(fileMappingManager.deleteFile(new Path(linkedSrc), 
false)).isEqualTo(true);
+        assertThat(localFS.exists(new Path(src))).isFalse();
+        assertThat(localFS.exists(new Path(testDir))).isFalse();
+        assertThat(localFS.exists(new Path(linkedDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isTrue();
+
+        // delete create file
+        String renamedCreated = linkedDir + "/create.sst";
+        assertThat(fileMappingManager.deleteFile(new Path(renamedCreated), 
false)).isEqualTo(true);
+        assertThat(localFS.exists(new Path(renamedCreated))).isFalse();
+        assertThat(localFS.exists(new Path(linkedDir))).isTrue();
+        assertThat(localFS.exists(new Path(linkedDirTmp))).isFalse();
+        assertThat(localFS.exists(new Path(testDir))).isFalse();
+
+        // delete linkedDir
+        assertThat(fileMappingManager.deleteFile(new Path(linkedDir), 
true)).isEqualTo(true);
+        assertThat(localFS.exists(new Path(testDir))).isFalse();
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
index 678fd4db491..c6a937a512f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
@@ -205,7 +205,10 @@ public class ForStFlinkFileSystemTest {
         os.write(233);
         os.sync();
         os.close();
-        assertThat(fileSystem.exists(new 
org.apache.flink.core.fs.Path(localPath, "CURRENT")))
+        assertThat(
+                        localPath
+                                .getFileSystem()
+                                .exists(new 
org.apache.flink.core.fs.Path(localPath, "CURRENT")))
                 .isTrue();
         ByteBufferReadableFSDataInputStream is =
                 fileSystem.open(new org.apache.flink.core.fs.Path(remotePath, 
"CURRENT"));
@@ -245,7 +248,10 @@ public class ForStFlinkFileSystemTest {
         os.close();
         assertThat(fileSystem.exists(new 
org.apache.flink.core.fs.Path(remotePath, "1.sst")))
                 .isTrue();
-        assertThat(fileSystem.exists(new 
org.apache.flink.core.fs.Path(cachePath, "1.sst")))
+        assertThat(
+                        cachePath
+                                .getFileSystem()
+                                .exists(new 
org.apache.flink.core.fs.Path(cachePath, "1.sst")))
                 .isTrue();
         FileCacheEntry cacheEntry = cache.get(cachePath.getPath() + "/1.sst");
         assertThat(cacheEntry).isNotNull();
@@ -268,7 +274,10 @@ public class ForStFlinkFileSystemTest {
                 .isTrue();
         assertThat(fileSystem.exists(new 
org.apache.flink.core.fs.Path(cachePath, "1.sst")))
                 .isFalse();
-        assertThat(fileSystem.exists(new 
org.apache.flink.core.fs.Path(cachePath, "2.sst")))
+        assertThat(
+                        cachePath
+                                .getFileSystem()
+                                .exists(new 
org.apache.flink.core.fs.Path(cachePath, "2.sst")))
                 .isTrue();
         assertThat(cacheEntry.getReferenceCount()).isEqualTo(0);
         // read after evict

Reply via email to