This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 67b02e489419d6d8a55af0b8781f80eab022e650
Author: Zakelly <[email protected]>
AuthorDate: Sun Mar 9 14:13:26 2025 +0800

    [FLINK-37437][state/forst] Support file override in wrapped file system
---
 .../flink/state/forst/fs/ForStFlinkFileSystem.java |  3 +-
 .../forst/fs/filemapping/FileMappingManager.java   | 77 +++++++++++++++-------
 .../state/forst/fs/FileMappingManagerTest.java     |  2 +-
 .../state/forst/fs/ForStFlinkFileSystemTest.java   | 32 +++++++++
 4 files changed, 88 insertions(+), 26 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
index d816ab13cc2..4e75d8d6369 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java
@@ -186,7 +186,8 @@ public class ForStFlinkFileSystem extends FileSystem 
implements Closeable {
     public synchronized ByteBufferWritableFSDataOutputStream create(
             Path dbFilePath, WriteMode overwriteMode) throws IOException {
         // Create a file in the mapping table
-        MappingEntry createdMappingEntry = 
fileMappingManager.createNewFile(dbFilePath);
+        MappingEntry createdMappingEntry =
+                fileMappingManager.createNewFile(dbFilePath, overwriteMode == 
WriteMode.OVERWRITE);
 
         // The source must be backed by a file
         FileBackedMappingEntrySource source =
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
index 5b56c09cb77..db36faa823e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
@@ -61,14 +61,18 @@ public class FileMappingManager {
     }
 
     /** Create a new file in the mapping table. */
-    public MappingEntry createNewFile(Path filePath) {
+    public MappingEntry createNewFile(Path filePath, boolean overwrite) {
         String key = filePath.toString();
         if (FileOwnershipDecider.shouldAlwaysBeLocal(filePath)) {
             filePath = forceLocalPath(filePath);
         }
 
         return addFileToMappingTable(
-                key, toUUIDPath(filePath), 
FileOwnershipDecider.decideForNewFile(filePath), true);
+                key,
+                toUUIDPath(filePath),
+                FileOwnershipDecider.decideForNewFile(filePath),
+                true,
+                overwrite);
     }
 
     /** Register a file restored from checkpoints to the mapping table. */
@@ -85,40 +89,65 @@ public class FileMappingManager {
     private MappingEntry addHandleBackedFileToMappingTable(
             String key, StreamStateHandle stateHandle, FileOwnership 
fileOwnership) {
         MappingEntrySource source = new 
HandleBackedMappingEntrySource(stateHandle);
-        MappingEntry existingEntry = getExistingMappingEntry(key, source, 
fileOwnership);
+        MappingEntry existingEntry = mappingTable.getOrDefault(key, null);
+        if (existingEntry != null) {
+            Preconditions.checkState(
+                    existingEntry.source.equals(source)
+                            && existingEntry.fileOwnership == fileOwnership,
+                    "Try to add a file that is already in mappingTable,"
+                            + " but with inconsistent entry. Key: %s, source: 
%s, fileOwnership: %s. "
+                            + " Entry in table: %s",
+                    key,
+                    source,
+                    fileOwnership,
+                    existingEntry);
+
+            LOG.trace("Skip adding a file that already exists in mapping 
table: {}", key);
+        }
         return existingEntry == null
                 ? addMappingEntry(key, new MappingEntry(1, source, 
fileOwnership, false, false))
                 : existingEntry;
     }
 
     private MappingEntry addFileToMappingTable(
-            String key, Path filePath, FileOwnership fileOwnership, boolean 
writing) {
+            String key,
+            Path filePath,
+            FileOwnership fileOwnership,
+            boolean writing,
+            boolean overwrite) {
         MappingEntrySource source = new FileBackedMappingEntrySource(filePath);
-        MappingEntry existingEntry = getExistingMappingEntry(key, source, 
fileOwnership);
+        MappingEntry existingEntry = mappingTable.getOrDefault(key, null);
+        if (existingEntry != null) {
+            if (!(existingEntry.source.equals(source)
+                    && existingEntry.fileOwnership == fileOwnership)) {
+                if (overwrite) {
+                    // if the file is already in the mapping table, but with 
different source or
+                    // fileOwnership,
+                    // we should remove the existing entry and add a new entry.
+                    LOG.trace(
+                            "Replace the mapping entry for file: {} from {} to 
{}",
+                            key,
+                            existingEntry.source,
+                            source);
+                    mappingTable.remove(key).release();
+                    existingEntry = null;
+                } else {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "Try to add a file that is already in 
mappingTable,"
+                                            + " but with inconsistent entry. 
Key: %s, source: %s, fileOwnership: %s. "
+                                            + " Entry in table: %s",
+                                    key, source, fileOwnership, 
existingEntry));
+                }
+            } else {
+                LOG.trace("Skip adding a file that already exists in mapping 
table: {}", key);
+            }
+        }
         return existingEntry == null
                 ? addMappingEntry(key, new MappingEntry(1, source, 
fileOwnership, false, writing))
                 : existingEntry;
     }
 
-    private @Nullable MappingEntry getExistingMappingEntry(
-            String key, MappingEntrySource source, FileOwnership 
fileOwnership) {
-        MappingEntry entryInTable = mappingTable.getOrDefault(key, null);
-        if (entryInTable != null) {
-            Preconditions.checkState(
-                    entryInTable.source.equals(source)
-                            && entryInTable.fileOwnership == fileOwnership,
-                    String.format(
-                            "Try to add a file that is already in 
mappingTable,"
-                                    + " but with inconsistent entry. Key: %s, 
source: %s, fileOwnership: %s. "
-                                    + " Entry in table: %s",
-                            key, source, fileOwnership, entryInTable));
-
-            LOG.trace("Skip adding a file that already exists in mapping 
table: {}", key);
-            return entryInTable;
-        }
-        return null;
-    }
-
     private MappingEntry addMappingEntry(String key, MappingEntry entry) {
         mappingTable.put(key, entry);
         LOG.trace("Add entry to mapping table: {} -> {}", key, entry);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
index ff7875be8cb..84fe596f4aa 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java
@@ -55,7 +55,7 @@ public class FileMappingManagerTest {
             return manager.registerReusedRestoredFile(
                     filePath.toString(), new FileStateHandle(filePath, 0), 
filePath);
         } else {
-            return manager.createNewFile(filePath);
+            return manager.createNewFile(filePath, false);
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
index d52f8edae65..84c9de5ff0f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java
@@ -381,6 +381,38 @@ public class ForStFlinkFileSystemTest {
         assertFileStatusAndBlockLocations(fileSystem, 
fileSystem.getFileStatus(sstRemotePath1));
     }
 
+    @Test
+    public void testOverride() throws IOException {
+        org.apache.flink.core.fs.Path remotePath =
+                new org.apache.flink.core.fs.Path(tempDir.toString() + 
"/remote");
+        org.apache.flink.core.fs.Path localPath =
+                new org.apache.flink.core.fs.Path(tempDir.toString() + 
"/local");
+        ForStFlinkFileSystem fileSystem =
+                new ForStFlinkFileSystem(
+                        new ByteBufferReadableLocalFileSystem(),
+                        remotePath.toString(),
+                        localPath.toString(),
+                        null);
+        fileSystem.mkdirs(remotePath);
+        fileSystem.mkdirs(localPath);
+        org.apache.flink.core.fs.Path sstRemotePath1 =
+                new org.apache.flink.core.fs.Path(remotePath, "1.sst");
+        ByteBufferWritableFSDataOutputStream os1 = 
fileSystem.create(sstRemotePath1);
+        os1.write(76);
+        os1.close();
+        assertThat(fileSystem.exists(sstRemotePath1)).isTrue();
+        ByteBufferReadableFSDataInputStream is = 
fileSystem.open(sstRemotePath1);
+        assertThat(is.read()).isEqualTo(76);
+
+        // rewrite
+        ByteBufferWritableFSDataOutputStream os2 = 
fileSystem.create(sstRemotePath1);
+        os2.write(79);
+        os2.close();
+        assertThat(fileSystem.exists(sstRemotePath1)).isTrue();
+        is = fileSystem.open(sstRemotePath1);
+        assertThat(is.read()).isEqualTo(79);
+    }
+
     private static void assertFileStatusAndBlockLocations(
             FileSystem fileSystem, FileStatus fileStatus) throws IOException {
         BlockLocation[] blockLocations =

Reply via email to