This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push: new 1c3738e3e34 [FLINK-37868][state/forst] Respect the `maxTransferBytes` when using path copying in ForSt (#26763) 1c3738e3e34 is described below commit 1c3738e3e34211ff272b69fd384721b4c0f9d7b0 Author: AlexYinHan <alexyin...@gmail.com> AuthorDate: Fri Aug 22 10:39:28 2025 +0800 [FLINK-37868][state/forst] Respect the `maxTransferBytes` when using path copying in ForSt (#26763) --- .../datatransfer/CopyDataTransferStrategy.java | 12 ++++- .../datatransfer/DataTransferStrategyTest.java | 53 ++++++++++++++++++++-- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java index 9c9d188906b..f572c40e52e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java @@ -127,7 +127,8 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { sourceStateHandle, checkpointStreamFactory, stateScope, - tmpResourcesRegistry); + tmpResourcesRegistry, + maxTransferBytes); if (targetStateHandle != null) { LOG.trace("Path-copy file to checkpoint: {} {}", dbFilePath, targetStateHandle); } else { @@ -153,15 +154,22 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { * @param checkpointStreamFactory The checkpoint stream factory * @param stateScope The state scope * @param tmpResourcesRegistry The temporary resources registry + * @param maxTransferBytes The max transfer bytes * @return The target state handle if path-copying is successful, otherwise null */ private @Nullable StreamStateHandle tryPathCopyingToCheckpoint( @Nonnull StreamStateHandle sourceHandle, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, - CloseableRegistry tmpResourcesRegistry) { + CloseableRegistry tmpResourcesRegistry, + long maxTransferBytes) { try { + // skip if there is a limit of transfer bytes + if (maxTransferBytes > 0 && maxTransferBytes != Long.MAX_VALUE) { + return null; + } + // copy the file by duplicating if (!checkpointStreamFactory.canFastDuplicate(sourceHandle, stateScope)) { return null; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java index 77d80eee012..73f3f46df4f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java @@ -68,6 +68,7 @@ import java.util.Random; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeFalse; /** Unit test for {@link ReusableDataTransferStrategy}. */ @ExtendWith(ParameterizedTestExtension.class) @@ -169,6 +170,10 @@ public class DataTransferStrategyTest { } private void createDbFiles(List<String> fileNames) throws IOException { + createDbFiles(fileNames, 2048); + } + + private void createDbFiles(List<String> fileNames, int fileLength) throws IOException { for (String fileName : fileNames) { Path dir = FileOwnershipDecider.shouldAlwaysBeLocal(new Path(fileName)) @@ -177,7 +182,7 @@ public class DataTransferStrategyTest { FSDataOutputStream output = dbDelegateFileSystem.create( new Path(dir, fileName), FileSystem.WriteMode.OVERWRITE); - output.write(genRandomBytes(2048)); + output.write(genRandomBytes(fileLength)); output.sync(); output.close(); dbFilePaths.put(fileName, new Path(dir, fileName)); @@ -253,13 +258,18 @@ public class DataTransferStrategyTest { } private DBFilesSnapshot snapshot(DataTransferStrategy strategy) throws IOException { + return snapshot(strategy, Long.MAX_VALUE); + } + + private DBFilesSnapshot snapshot(DataTransferStrategy strategy, long maxTransferBytes) + throws IOException { DBFilesSnapshot snapshot = new DBFilesSnapshot(); for (String fileName : dbFilePaths.keySet()) { Path dbFilePath = dbFilePaths.get(fileName); HandleAndLocalPath handleAndLocalPath = strategy.transferToCheckpoint( dbFilePath, - MAX_TRANSFER_BYTES, + maxTransferBytes, checkpointStreamFactory, CheckpointedStateScope.SHARED, closeableRegistry, @@ -395,8 +405,6 @@ public class DataTransferStrategyTest { @TempDir static java.nio.file.Path tempDir; - private static final long MAX_TRANSFER_BYTES = Long.MAX_VALUE; - private DBFilesContainer createDb( JobID jobID, int subtaskIndex, @@ -638,4 +646,41 @@ public class DataTransferStrategyTest { lastSnapshot.checkFilesExist(false, dbDirUnderCpDir); lastSnapshot.checkFilesExist(true, false); } + + private void createDbFilesWithExactSize( + DBFilesContainer db, List<String> newDbFileNames, int fileLength) throws IOException { + db.createDbFiles(newDbFileNames, fileLength); + for (String fileName : newDbFileNames) { + long fileLen = + db.dbDelegateFileSystem.getFileStatus(db.dbFilePaths.get(fileName)).getLen(); + assertThat(fileLen).isEqualTo(fileLength); + } + db.checkDbFilesExist(newDbFileNames); + } + + @TestTemplate + public void testSnapshotWithMaxTransferBytes() throws IOException { + FileNameGenerator fileNameGenerator = new FileNameGenerator(); + JobID jobID = new JobID(); + Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy = + createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode, pathCopying); + DBFilesContainer db = dbAndStrategy.f0; + DataTransferStrategy strategy = dbAndStrategy.f1; + + // skip the cases when db files are reused for snapshots + assumeFalse(strategy instanceof ReusableDataTransferStrategy); + System.out.println(strategy.getClass()); + + // create new files for DB + createDbFilesWithExactSize(db, fileNameGenerator.genMultipleFileNames(4, 4), 2048); + createDbFilesWithExactSize(db, fileNameGenerator.genMultipleFileNames(4, 4), 128); + + // create a snapshot + DBFilesSnapshot lastSnapshot = db.snapshot(strategy, 1024); + db.assertFilesReusedToCheckpoint(lastSnapshot.getStateHandles()); + + for (Tuple2<Path, HandleAndLocalPath> tuple : lastSnapshot.dbSnapshotFiles.values()) { + assertThat(tuple.f1.getStateSize()).isLessThanOrEqualTo(1024); + } + } }