This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dccb782134b42f41b2a595b8b3e27851c495a3fd
Author: yinhan.yh <yinhan...@alibaba-inc.com>
AuthorDate: Fri Jan 17 18:12:21 2025 +0800

    [FLINK-37021][state/forst] Leverage fast-copying of FileSystems for 
snapshots
---
 .../datatransfer/CopyDataTransferStrategy.java     | 107 ++++++++++++++++++---
 1 file changed, 96 insertions(+), 11 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
index 9a375e0e01f2..3423318f8d89 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
@@ -26,17 +26,26 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
+import 
org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource;
+import 
org.apache.flink.state.forst.fs.filemapping.HandleBackedMappingEntrySource;
+import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
+import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource;
 import org.apache.flink.util.IOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Data transfer strategy for ForSt DB without a remote DB path. It always 
copies the file to/from
  * checkpoint storage when transferring data.
@@ -54,7 +63,7 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
     }
 
     @Override
-    public IncrementalKeyedStateHandle.HandleAndLocalPath transferToCheckpoint(
+    public HandleAndLocalPath transferToCheckpoint(
             Path dbFilePath,
             long maxTransferBytes,
             CheckpointStreamFactory checkpointStreamFactory,
@@ -63,7 +72,6 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
             CloseableRegistry tmpResourcesRegistry)
             throws IOException {
 
-        LOG.trace("Copy file to checkpoint: {}", dbFilePath);
         if (maxTransferBytes < 0) {
             // Means transfer whole file to checkpoint storage.
             maxTransferBytes = Long.MAX_VALUE;
@@ -92,7 +100,7 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
         return "CopyDataTransferStrategy{" + ", dbFileSystem=" + dbFileSystem 
+ '}';
     }
 
-    private static IncrementalKeyedStateHandle.HandleAndLocalPath 
copyFileToCheckpoint(
+    private static HandleAndLocalPath copyFileToCheckpoint(
             FileSystem dbFileSystem,
             Path filePath,
             long maxTransferBytes,
@@ -101,7 +109,87 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
             CloseableRegistry closeableRegistry,
             CloseableRegistry tmpResourcesRegistry)
             throws IOException {
+        StreamStateHandle handleByDuplicating =
+                duplicateFileToCheckpoint(
+                        dbFileSystem, filePath, checkpointStreamFactory, 
stateScope);
+        if (handleByDuplicating != null) {
+            LOG.trace("Duplicate file to checkpoint: {} {}", filePath, 
handleByDuplicating);
+            return HandleAndLocalPath.of(handleByDuplicating, 
filePath.getName());
+        }
+
+        HandleAndLocalPath handleAndLocalPath =
+                HandleAndLocalPath.of(
+                        writeFileToCheckpoint(
+                                dbFileSystem,
+                                filePath,
+                                maxTransferBytes,
+                                checkpointStreamFactory,
+                                stateScope,
+                                closeableRegistry,
+                                tmpResourcesRegistry),
+                        filePath.getName());
+        LOG.trace("Write file to checkpoint: {}, {}", filePath, 
handleAndLocalPath.getHandle());
+        return handleAndLocalPath;
+    }
+
+    /**
+     * Duplicate file to checkpoint storage by calling {@link 
CheckpointStreamFactory#duplicate} if
+     * possible.
+     */
+    private static @Nullable StreamStateHandle duplicateFileToCheckpoint(
+            FileSystem dbFileSystem,
+            Path filePath,
+            CheckpointStreamFactory checkpointStreamFactory,
+            CheckpointedStateScope stateScope)
+            throws IOException {
+
+        StreamStateHandle stateHandle = getStateHandle(dbFileSystem, filePath);
+
+        if (!checkpointStreamFactory.canFastDuplicate(stateHandle, 
stateScope)) {
+            return null;
+        }
+
+        List<StreamStateHandle> result =
+                checkpointStreamFactory.duplicate(
+                        Collections.singletonList(stateHandle), stateScope);
+        return result.get(0);
+    }
 
+    private static StreamStateHandle getStateHandle(FileSystem dbFileSystem, 
Path filePath)
+            throws IOException {
+        Path sourceFilePath = filePath;
+        if (dbFileSystem instanceof ForStFlinkFileSystem) {
+            MappingEntry mappingEntry =
+                    ((ForStFlinkFileSystem) 
dbFileSystem).getMappingEntry(filePath);
+            Preconditions.checkNotNull(
+                    mappingEntry, "File mapping entry not found for %s", 
filePath);
+
+            MappingEntrySource source = mappingEntry.getSource();
+            if (source instanceof HandleBackedMappingEntrySource) {
+                // return the state handle stored in MappingEntry
+                return ((HandleBackedMappingEntrySource) 
source).getStateHandle();
+            } else {
+                // use file path stored in MappingEntry
+                sourceFilePath = ((FileBackedMappingEntrySource) 
source).getFilePath();
+            }
+        }
+
+        // construct a FileStateHandle base on source file
+        FileSystem sourceFileSystem = sourceFilePath.getFileSystem();
+        long fileLength = 
sourceFileSystem.getFileStatus(sourceFilePath).getLen();
+        return new FileStateHandle(sourceFilePath, fileLength);
+    }
+
+    /** Write file to checkpoint storage through {@link 
CheckpointStateOutputStream}. */
+    private static @Nullable StreamStateHandle writeFileToCheckpoint(
+            FileSystem dbFileSystem,
+            Path filePath,
+            long maxTransferBytes,
+            CheckpointStreamFactory checkpointStreamFactory,
+            CheckpointedStateScope stateScope,
+            CloseableRegistry closeableRegistry,
+            CloseableRegistry tmpResourcesRegistry)
+            throws IOException {
         InputStream inputStream = null;
         CheckpointStateOutputStream outputStream = null;
 
@@ -137,14 +225,11 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
             tmpResourcesRegistry.registerCloseable(
                     () -> StateUtil.discardStateObjectQuietly(result));
 
-            return IncrementalKeyedStateHandle.HandleAndLocalPath.of(result, 
filePath.getName());
-
+            return result;
         } finally {
-
             if (closeableRegistry.unregisterCloseable(inputStream)) {
                 IOUtils.closeQuietly(inputStream);
             }
-
             if (closeableRegistry.unregisterCloseable(outputStream)) {
                 IOUtils.closeQuietly(outputStream);
             }

Reply via email to