This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2b5e28b04f98f0b5c8005dc5b2e0fc0a69ce2de Author: Zakelly <[email protected]> AuthorDate: Sun Mar 9 14:13:26 2025 +0800 [FLINK-37437][state/forst] Support file override in wrapped file system --- .../flink/state/forst/fs/ForStFlinkFileSystem.java | 3 +- .../forst/fs/filemapping/FileMappingManager.java | 77 +++++++++++++++------- .../state/forst/fs/FileMappingManagerTest.java | 2 +- .../state/forst/fs/ForStFlinkFileSystemTest.java | 32 +++++++++ 4 files changed, 88 insertions(+), 26 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index d816ab13cc2..4e75d8d6369 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -186,7 +186,8 @@ public class ForStFlinkFileSystem extends FileSystem implements Closeable { public synchronized ByteBufferWritableFSDataOutputStream create( Path dbFilePath, WriteMode overwriteMode) throws IOException { // Create a file in the mapping table - MappingEntry createdMappingEntry = fileMappingManager.createNewFile(dbFilePath); + MappingEntry createdMappingEntry = + fileMappingManager.createNewFile(dbFilePath, overwriteMode == WriteMode.OVERWRITE); // The source must be backed by a file FileBackedMappingEntrySource source = diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java index 5b56c09cb77..db36faa823e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java @@ -61,14 +61,18 @@ public class FileMappingManager { } /** Create a new file in the mapping table. */ - public MappingEntry createNewFile(Path filePath) { + public MappingEntry createNewFile(Path filePath, boolean overwrite) { String key = filePath.toString(); if (FileOwnershipDecider.shouldAlwaysBeLocal(filePath)) { filePath = forceLocalPath(filePath); } return addFileToMappingTable( - key, toUUIDPath(filePath), FileOwnershipDecider.decideForNewFile(filePath), true); + key, + toUUIDPath(filePath), + FileOwnershipDecider.decideForNewFile(filePath), + true, + overwrite); } /** Register a file restored from checkpoints to the mapping table. */ @@ -85,40 +89,65 @@ public class FileMappingManager { private MappingEntry addHandleBackedFileToMappingTable( String key, StreamStateHandle stateHandle, FileOwnership fileOwnership) { MappingEntrySource source = new HandleBackedMappingEntrySource(stateHandle); - MappingEntry existingEntry = getExistingMappingEntry(key, source, fileOwnership); + MappingEntry existingEntry = mappingTable.getOrDefault(key, null); + if (existingEntry != null) { + Preconditions.checkState( + existingEntry.source.equals(source) + && existingEntry.fileOwnership == fileOwnership, + "Try to add a file that is already in mappingTable," + + " but with inconsistent entry. Key: %s, source: %s, fileOwnership: %s. " + + " Entry in table: %s", + key, + source, + fileOwnership, + existingEntry); + + LOG.trace("Skip adding a file that already exists in mapping table: {}", key); + } return existingEntry == null ? addMappingEntry(key, new MappingEntry(1, source, fileOwnership, false, false)) : existingEntry; } private MappingEntry addFileToMappingTable( - String key, Path filePath, FileOwnership fileOwnership, boolean writing) { + String key, + Path filePath, + FileOwnership fileOwnership, + boolean writing, + boolean overwrite) { MappingEntrySource source = new FileBackedMappingEntrySource(filePath); - MappingEntry existingEntry = getExistingMappingEntry(key, source, fileOwnership); + MappingEntry existingEntry = mappingTable.getOrDefault(key, null); + if (existingEntry != null) { + if (!(existingEntry.source.equals(source) + && existingEntry.fileOwnership == fileOwnership)) { + if (overwrite) { + // if the file is already in the mapping table, but with different source or + // fileOwnership, + // we should remove the existing entry and add a new entry. + LOG.trace( + "Replace the mapping entry for file: {} from {} to {}", + key, + existingEntry.source, + source); + mappingTable.remove(key).release(); + existingEntry = null; + } else { + throw new IllegalStateException( + String.format( + "Try to add a file that is already in mappingTable," + + " but with inconsistent entry. Key: %s, source: %s, fileOwnership: %s. " + + " Entry in table: %s", + key, source, fileOwnership, existingEntry)); + } + } else { + LOG.trace("Skip adding a file that already exists in mapping table: {}", key); + } + } return existingEntry == null ? addMappingEntry(key, new MappingEntry(1, source, fileOwnership, false, writing)) : existingEntry; } - private @Nullable MappingEntry getExistingMappingEntry( - String key, MappingEntrySource source, FileOwnership fileOwnership) { - MappingEntry entryInTable = mappingTable.getOrDefault(key, null); - if (entryInTable != null) { - Preconditions.checkState( - entryInTable.source.equals(source) - && entryInTable.fileOwnership == fileOwnership, - String.format( - "Try to add a file that is already in mappingTable," - + " but with inconsistent entry. Key: %s, source: %s, fileOwnership: %s. " - + " Entry in table: %s", - key, source, fileOwnership, entryInTable)); - - LOG.trace("Skip adding a file that already exists in mapping table: {}", key); - return entryInTable; - } - return null; - } - private MappingEntry addMappingEntry(String key, MappingEntry entry) { mappingTable.put(key, entry); LOG.trace("Add entry to mapping table: {} -> {}", key, entry); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java index ff7875be8cb..84fe596f4aa 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/FileMappingManagerTest.java @@ -55,7 +55,7 @@ public class FileMappingManagerTest { return manager.registerReusedRestoredFile( filePath.toString(), new FileStateHandle(filePath, 0), filePath); } else { - return manager.createNewFile(filePath); + return manager.createNewFile(filePath, false); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java index d52f8edae65..84c9de5ff0f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java @@ -381,6 +381,38 @@ public class ForStFlinkFileSystemTest { assertFileStatusAndBlockLocations(fileSystem, fileSystem.getFileStatus(sstRemotePath1)); } + @Test + public void testOverride() throws IOException { + org.apache.flink.core.fs.Path remotePath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/remote"); + org.apache.flink.core.fs.Path localPath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/local"); + ForStFlinkFileSystem fileSystem = + new ForStFlinkFileSystem( + new ByteBufferReadableLocalFileSystem(), + remotePath.toString(), + localPath.toString(), + null); + fileSystem.mkdirs(remotePath); + fileSystem.mkdirs(localPath); + org.apache.flink.core.fs.Path sstRemotePath1 = + new org.apache.flink.core.fs.Path(remotePath, "1.sst"); + ByteBufferWritableFSDataOutputStream os1 = fileSystem.create(sstRemotePath1); + os1.write(76); + os1.close(); + assertThat(fileSystem.exists(sstRemotePath1)).isTrue(); + ByteBufferReadableFSDataInputStream is = fileSystem.open(sstRemotePath1); + assertThat(is.read()).isEqualTo(76); + + // rewrite + ByteBufferWritableFSDataOutputStream os2 = fileSystem.create(sstRemotePath1); + os2.write(79); + os2.close(); + assertThat(fileSystem.exists(sstRemotePath1)).isTrue(); + is = fileSystem.open(sstRemotePath1); + assertThat(is.read()).isEqualTo(79); + } + private static void assertFileStatusAndBlockLocations( FileSystem fileSystem, FileStatus fileStatus) throws IOException { BlockLocation[] blockLocations =
