This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 1c3738e3e34 [FLINK-37868][state/forst] Respect the `maxTransferBytes` 
when using path copying in ForSt (#26763)
1c3738e3e34 is described below

commit 1c3738e3e34211ff272b69fd384721b4c0f9d7b0
Author: AlexYinHan <alexyin...@gmail.com>
AuthorDate: Fri Aug 22 10:39:28 2025 +0800

    [FLINK-37868][state/forst] Respect the `maxTransferBytes` when using path 
copying in ForSt (#26763)
---
 .../datatransfer/CopyDataTransferStrategy.java     | 12 ++++-
 .../datatransfer/DataTransferStrategyTest.java     | 53 ++++++++++++++++++++--
 2 files changed, 59 insertions(+), 6 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
index 9c9d188906b..f572c40e52e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
@@ -127,7 +127,8 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
                         sourceStateHandle,
                         checkpointStreamFactory,
                         stateScope,
-                        tmpResourcesRegistry);
+                        tmpResourcesRegistry,
+                        maxTransferBytes);
         if (targetStateHandle != null) {
             LOG.trace("Path-copy file to checkpoint: {} {}", dbFilePath, 
targetStateHandle);
         } else {
@@ -153,15 +154,22 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
      * @param checkpointStreamFactory The checkpoint stream factory
      * @param stateScope The state scope
      * @param tmpResourcesRegistry The temporary resources registry
+     * @param maxTransferBytes The max transfer bytes
      * @return The target state handle if path-copying is successful, 
otherwise null
      */
     private @Nullable StreamStateHandle tryPathCopyingToCheckpoint(
             @Nonnull StreamStateHandle sourceHandle,
             CheckpointStreamFactory checkpointStreamFactory,
             CheckpointedStateScope stateScope,
-            CloseableRegistry tmpResourcesRegistry) {
+            CloseableRegistry tmpResourcesRegistry,
+            long maxTransferBytes) {
 
         try {
+            // skip if there is a limit of transfer bytes
+            if (maxTransferBytes > 0 && maxTransferBytes != Long.MAX_VALUE) {
+                return null;
+            }
+
             // copy the file by duplicating
             if (!checkpointStreamFactory.canFastDuplicate(sourceHandle, 
stateScope)) {
                 return null;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
index 77d80eee012..73f3f46df4f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
@@ -68,6 +68,7 @@ import java.util.Random;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
 
 /** Unit test for {@link ReusableDataTransferStrategy}. */
 @ExtendWith(ParameterizedTestExtension.class)
@@ -169,6 +170,10 @@ public class DataTransferStrategyTest {
         }
 
         private void createDbFiles(List<String> fileNames) throws IOException {
+            createDbFiles(fileNames, 2048);
+        }
+
+        private void createDbFiles(List<String> fileNames, int fileLength) 
throws IOException {
             for (String fileName : fileNames) {
                 Path dir =
                         FileOwnershipDecider.shouldAlwaysBeLocal(new 
Path(fileName))
@@ -177,7 +182,7 @@ public class DataTransferStrategyTest {
                 FSDataOutputStream output =
                         dbDelegateFileSystem.create(
                                 new Path(dir, fileName), 
FileSystem.WriteMode.OVERWRITE);
-                output.write(genRandomBytes(2048));
+                output.write(genRandomBytes(fileLength));
                 output.sync();
                 output.close();
                 dbFilePaths.put(fileName, new Path(dir, fileName));
@@ -253,13 +258,18 @@ public class DataTransferStrategyTest {
         }
 
         private DBFilesSnapshot snapshot(DataTransferStrategy strategy) throws 
IOException {
+            return snapshot(strategy, Long.MAX_VALUE);
+        }
+
+        private DBFilesSnapshot snapshot(DataTransferStrategy strategy, long 
maxTransferBytes)
+                throws IOException {
             DBFilesSnapshot snapshot = new DBFilesSnapshot();
             for (String fileName : dbFilePaths.keySet()) {
                 Path dbFilePath = dbFilePaths.get(fileName);
                 HandleAndLocalPath handleAndLocalPath =
                         strategy.transferToCheckpoint(
                                 dbFilePath,
-                                MAX_TRANSFER_BYTES,
+                                maxTransferBytes,
                                 checkpointStreamFactory,
                                 CheckpointedStateScope.SHARED,
                                 closeableRegistry,
@@ -395,8 +405,6 @@ public class DataTransferStrategyTest {
 
     @TempDir static java.nio.file.Path tempDir;
 
-    private static final long MAX_TRANSFER_BYTES = Long.MAX_VALUE;
-
     private DBFilesContainer createDb(
             JobID jobID,
             int subtaskIndex,
@@ -638,4 +646,41 @@ public class DataTransferStrategyTest {
         lastSnapshot.checkFilesExist(false, dbDirUnderCpDir);
         lastSnapshot.checkFilesExist(true, false);
     }
+
+    private void createDbFilesWithExactSize(
+            DBFilesContainer db, List<String> newDbFileNames, int fileLength) 
throws IOException {
+        db.createDbFiles(newDbFileNames, fileLength);
+        for (String fileName : newDbFileNames) {
+            long fileLen =
+                    
db.dbDelegateFileSystem.getFileStatus(db.dbFilePaths.get(fileName)).getLen();
+            assertThat(fileLen).isEqualTo(fileLength);
+        }
+        db.checkDbFilesExist(newDbFileNames);
+    }
+
+    @TestTemplate
+    public void testSnapshotWithMaxTransferBytes() throws IOException {
+        FileNameGenerator fileNameGenerator = new FileNameGenerator();
+        JobID jobID = new JobID();
+        Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy =
+                createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode, pathCopying);
+        DBFilesContainer db = dbAndStrategy.f0;
+        DataTransferStrategy strategy = dbAndStrategy.f1;
+
+        // skip the cases when db files are reused for snapshots
+        assumeFalse(strategy instanceof ReusableDataTransferStrategy);
+        System.out.println(strategy.getClass());
+
+        // create new files for DB
+        createDbFilesWithExactSize(db, 
fileNameGenerator.genMultipleFileNames(4, 4), 2048);
+        createDbFilesWithExactSize(db, 
fileNameGenerator.genMultipleFileNames(4, 4), 128);
+
+        // create a snapshot
+        DBFilesSnapshot lastSnapshot = db.snapshot(strategy, 1024);
+        db.assertFilesReusedToCheckpoint(lastSnapshot.getStateHandles());
+
+        for (Tuple2<Path, HandleAndLocalPath> tuple : 
lastSnapshot.dbSnapshotFiles.values()) {
+            assertThat(tuple.f1.getStateSize()).isLessThanOrEqualTo(1024);
+        }
+    }
 }

Reply via email to