This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dccb782134b42f41b2a595b8b3e27851c495a3fd Author: yinhan.yh <yinhan...@alibaba-inc.com> AuthorDate: Fri Jan 17 18:12:21 2025 +0800 [FLINK-37021][state/forst] Leverage fast-copying of FileSystems for snapshots --- .../datatransfer/CopyDataTransferStrategy.java | 107 ++++++++++++++++++--- 1 file changed, 96 insertions(+), 11 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java index 9a375e0e01f2..3423318f8d89 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java @@ -26,17 +26,26 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; +import org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource; +import org.apache.flink.state.forst.fs.filemapping.HandleBackedMappingEntrySource; +import org.apache.flink.state.forst.fs.filemapping.MappingEntry; +import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource; import org.apache.flink.util.IOUtils; - -import java.io.IOException; -import java.io.InputStream; +import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; + /** * Data transfer strategy for ForSt DB without a remote DB path. It always copies the file to/from * checkpoint storage when transferring data. @@ -54,7 +63,7 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { } @Override - public IncrementalKeyedStateHandle.HandleAndLocalPath transferToCheckpoint( + public HandleAndLocalPath transferToCheckpoint( Path dbFilePath, long maxTransferBytes, CheckpointStreamFactory checkpointStreamFactory, @@ -63,7 +72,6 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { CloseableRegistry tmpResourcesRegistry) throws IOException { - LOG.trace("Copy file to checkpoint: {}", dbFilePath); if (maxTransferBytes < 0) { // Means transfer whole file to checkpoint storage. maxTransferBytes = Long.MAX_VALUE; @@ -92,7 +100,7 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { return "CopyDataTransferStrategy{" + ", dbFileSystem=" + dbFileSystem + '}'; } - private static IncrementalKeyedStateHandle.HandleAndLocalPath copyFileToCheckpoint( + private static HandleAndLocalPath copyFileToCheckpoint( FileSystem dbFileSystem, Path filePath, long maxTransferBytes, @@ -101,7 +109,87 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException { + StreamStateHandle handleByDuplicating = + duplicateFileToCheckpoint( + dbFileSystem, filePath, checkpointStreamFactory, stateScope); + if (handleByDuplicating != null) { + LOG.trace("Duplicate file to checkpoint: {} {}", filePath, handleByDuplicating); + return HandleAndLocalPath.of(handleByDuplicating, filePath.getName()); + } + + HandleAndLocalPath handleAndLocalPath = + HandleAndLocalPath.of( + writeFileToCheckpoint( + dbFileSystem, + filePath, + maxTransferBytes, + checkpointStreamFactory, + stateScope, + closeableRegistry, + tmpResourcesRegistry), + filePath.getName()); + LOG.trace("Write file to checkpoint: {}, {}", filePath, handleAndLocalPath.getHandle()); + return handleAndLocalPath; + } + + /** + * Duplicate file to checkpoint storage by calling {@link CheckpointStreamFactory#duplicate} if + * possible. + */ + private static @Nullable StreamStateHandle duplicateFileToCheckpoint( + FileSystem dbFileSystem, + Path filePath, + CheckpointStreamFactory checkpointStreamFactory, + CheckpointedStateScope stateScope) + throws IOException { + + StreamStateHandle stateHandle = getStateHandle(dbFileSystem, filePath); + + if (!checkpointStreamFactory.canFastDuplicate(stateHandle, stateScope)) { + return null; + } + + List<StreamStateHandle> result = + checkpointStreamFactory.duplicate( + Collections.singletonList(stateHandle), stateScope); + return result.get(0); + } + private static StreamStateHandle getStateHandle(FileSystem dbFileSystem, Path filePath) + throws IOException { + Path sourceFilePath = filePath; + if (dbFileSystem instanceof ForStFlinkFileSystem) { + MappingEntry mappingEntry = + ((ForStFlinkFileSystem) dbFileSystem).getMappingEntry(filePath); + Preconditions.checkNotNull( + mappingEntry, "File mapping entry not found for %s", filePath); + + MappingEntrySource source = mappingEntry.getSource(); + if (source instanceof HandleBackedMappingEntrySource) { + // return the state handle stored in MappingEntry + return ((HandleBackedMappingEntrySource) source).getStateHandle(); + } else { + // use file path stored in MappingEntry + sourceFilePath = ((FileBackedMappingEntrySource) source).getFilePath(); + } + } + + // construct a FileStateHandle base on source file + FileSystem sourceFileSystem = sourceFilePath.getFileSystem(); + long fileLength = sourceFileSystem.getFileStatus(sourceFilePath).getLen(); + return new FileStateHandle(sourceFilePath, fileLength); + } + + /** Write file to checkpoint storage through {@link CheckpointStateOutputStream}. */ + private static @Nullable StreamStateHandle writeFileToCheckpoint( + FileSystem dbFileSystem, + Path filePath, + long maxTransferBytes, + CheckpointStreamFactory checkpointStreamFactory, + CheckpointedStateScope stateScope, + CloseableRegistry closeableRegistry, + CloseableRegistry tmpResourcesRegistry) + throws IOException { InputStream inputStream = null; CheckpointStateOutputStream outputStream = null; @@ -137,14 +225,11 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { tmpResourcesRegistry.registerCloseable( () -> StateUtil.discardStateObjectQuietly(result)); - return IncrementalKeyedStateHandle.HandleAndLocalPath.of(result, filePath.getName()); - + return result; } finally { - if (closeableRegistry.unregisterCloseable(inputStream)) { IOUtils.closeQuietly(inputStream); } - if (closeableRegistry.unregisterCloseable(outputStream)) { IOUtils.closeQuietly(outputStream); }