This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push: new da38a9810b8 [FLINK-37867][state/forst] Ensure files of half-uploaded checkpoints are cleaned when using path copying (#26759) da38a9810b8 is described below commit da38a9810b816c59b14cbedf514d13bb47d094a5 Author: AlexYinHan <alexyin...@gmail.com> AuthorDate: Thu Jul 17 11:21:30 2025 +0800 [FLINK-37867][state/forst] Ensure files of half-uploaded checkpoints are cleaned when using path copying (#26759) --- .../datatransfer/CopyDataTransferStrategy.java | 22 ++- .../datatransfer/DataTransferStrategyTest.java | 169 +++++++++++++++------ 2 files changed, 138 insertions(+), 53 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java index 6686ba41878..9c9d188906b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java @@ -123,7 +123,11 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { // Try path-copying first. If failed, fallback to bytes-copying StreamStateHandle targetStateHandle = - tryPathCopyingToCheckpoint(sourceStateHandle, checkpointStreamFactory, stateScope); + tryPathCopyingToCheckpoint( + sourceStateHandle, + checkpointStreamFactory, + stateScope, + tmpResourcesRegistry); if (targetStateHandle != null) { LOG.trace("Path-copy file to checkpoint: {} {}", dbFilePath, targetStateHandle); } else { @@ -148,22 +152,29 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { * @param sourceHandle The source state handle * @param checkpointStreamFactory The checkpoint stream factory * @param stateScope The state scope + * @param tmpResourcesRegistry The temporary resources registry * @return The target state handle if path-copying is successful, otherwise null */ private @Nullable StreamStateHandle tryPathCopyingToCheckpoint( @Nonnull StreamStateHandle sourceHandle, CheckpointStreamFactory checkpointStreamFactory, - CheckpointedStateScope stateScope) { + CheckpointedStateScope stateScope, + CloseableRegistry tmpResourcesRegistry) { try { + // copy the file by duplicating if (!checkpointStreamFactory.canFastDuplicate(sourceHandle, stateScope)) { return null; } - List<StreamStateHandle> result = checkpointStreamFactory.duplicate( Collections.singletonList(sourceHandle), stateScope); - return result.get(0); + StreamStateHandle resultStateHandle = result.get(0); + + // register the clean-up logic of the uploaded file + tmpResourcesRegistry.registerCloseable( + () -> StateUtil.discardStateObjectQuietly(resultStateHandle)); + return resultStateHandle; } catch (Exception e) { LOG.warn("Failed to duplicate file to checkpoint: {} {}", sourceHandle, stateScope, e); } @@ -179,6 +190,7 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException { + // copy the file by bytes InputStream inputStream = null; CheckpointStateOutputStream outputStream = null; @@ -211,6 +223,8 @@ public class CopyDataTransferStrategy extends DataTransferStrategy { } else { result = null; } + + // register the clean-up logic of the uploaded file tmpResourcesRegistry.registerCloseable( () -> StateUtil.discardStateObjectQuietly(result)); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java index 459348e49b5..77d80eee012 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java @@ -24,7 +24,9 @@ import org.apache.flink.core.execution.RecoveryClaimMode; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.ICloseableRegistry; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.PathsCopyingFileSystem; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -57,13 +59,12 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -89,14 +90,38 @@ public class DataTransferStrategyTest { } } + /** Dummy local file system that 'implements' path copying. */ + static class TestPathsCopyingLocalFileSystem extends LocalFileSystem + implements PathsCopyingFileSystem { + + TestPathsCopyingLocalFileSystem() { + super(); + } + + @Override + public void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry) + throws IOException { + for (CopyRequest request : requests) { + Path source = request.getSource(); + Path destination = request.getDestination(); + FileUtils.copy(source, destination, false); + } + } + + @Override + public boolean canCopyPaths(Path source, Path destination) throws IOException { + return true; + } + } + /** Container for DB files. */ static class DBFilesContainer { static CheckpointPathsContainer cpPathContainer = null; - FileSystem realFileSystem; + private Random rnd = new Random(); + FileSystem realFileSystem; Path dbCheckpointBase; - protected CheckpointStreamFactory checkpointStreamFactory; protected CloseableRegistry closeableRegistry; @@ -109,8 +134,12 @@ public class DataTransferStrategyTest { Map<String, Path> dbFilePaths = new HashMap<>(); - DBFilesContainer(Path dbLocalBase, Path dbRemoteBase) throws IOException { - realFileSystem = LocalFileSystem.getLocalFileSystem(); + DBFilesContainer(Path dbLocalBase, Path dbRemoteBase, boolean pathCopying) + throws IOException { + realFileSystem = + pathCopying + ? new TestPathsCopyingLocalFileSystem() + : LocalFileSystem.getLocalFileSystem(); // prepare db paths this.dbDelegateFileSystem = ForStFlinkFileSystem.get(dbRemoteBase.toUri()); @@ -133,6 +162,12 @@ public class DataTransferStrategyTest { closeableRegistry = new CloseableRegistry(); } + private byte[] genRandomBytes(int length) { + byte[] b = new byte[length]; + rnd.nextBytes(b); + return b; + } + private void createDbFiles(List<String> fileNames) throws IOException { for (String fileName : fileNames) { Path dir = @@ -142,7 +177,7 @@ public class DataTransferStrategyTest { FSDataOutputStream output = dbDelegateFileSystem.create( new Path(dir, fileName), FileSystem.WriteMode.OVERWRITE); - output.write(fileName.getBytes(StandardCharsets.UTF_8)); + output.write(genRandomBytes(2048)); output.sync(); output.close(); dbFilePaths.put(fileName, new Path(dir, fileName)); @@ -230,6 +265,8 @@ public class DataTransferStrategyTest { closeableRegistry, tmpResourcesRegistry); snapshot.add(fileName, dbFilePath, handleAndLocalPath); + checkpointStreamFactory.canFastDuplicate( + handleAndLocalPath.getHandle(), CheckpointedStateScope.SHARED); } return snapshot; } @@ -280,37 +317,29 @@ public class DataTransferStrategyTest { return handles; } - List<Path> getFilePaths() { - List<Path> filePaths = new ArrayList<>(); - dbSnapshotFiles - .values() - .forEach( - tuple -> { - HandleAndLocalPath handleAndLocalPath = tuple.f1; - if (handleAndLocalPath.getHandle() instanceof FileStateHandle) { - Path filePath = - ((FileStateHandle) handleAndLocalPath.getHandle()) - .getFilePath(); - filePaths.add(filePath); - } - }); - return filePaths; - } - List<String> getDbFiles() { return new ArrayList<>(dbSnapshotFiles.keySet()); } - void checkAllFilesExist() { - getFilePaths() - .forEach( - path -> { - try { - assertThat(path.getFileSystem().exists(path)).isTrue(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + void checkAllFilesExist() throws IOException { + checkFilesExist(true, true); + checkFilesExist(false, true); + } + + // check whether the snapshots for local/remote files exist + void checkFilesExist(boolean shouldBeLocalFile, boolean shouldExist) throws IOException { + for (Tuple2<Path, HandleAndLocalPath> tuple : dbSnapshotFiles.values()) { + Path dbFilePath = tuple.f0; + StreamStateHandle handle = tuple.f1.getHandle(); + if (!(handle instanceof FileStateHandle) + || FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath) + != shouldBeLocalFile) { + continue; + } + Path realFilePath = ((FileStateHandle) handle).getFilePath(); + boolean exist = realFilePath.getFileSystem().exists(realFilePath); + assertThat(exist).isEqualTo(shouldExist); + } } } @@ -340,15 +369,20 @@ public class DataTransferStrategyTest { } } - @Parameters(name = " recoveryClaimMode = {0}, dbDirUnderCpDir = {1}") + @Parameters(name = " recoveryClaimMode = {0}, dbDirUnderCpDir = {1}, pathCopying = {2}") public static List<Object[]> parameters() { - return Arrays.asList( - new Object[][] { - {RecoveryClaimMode.NO_CLAIM, false}, - {RecoveryClaimMode.NO_CLAIM, true}, - {RecoveryClaimMode.CLAIM, false}, - {RecoveryClaimMode.CLAIM, true}, - }); + Object[] recoveryClaimModeParams = {RecoveryClaimMode.NO_CLAIM, RecoveryClaimMode.CLAIM}; + Object[] dbDirUnderCpDirParams = {false, true}; + Object[] pathCopyingParams = {false, true}; + List<Object[]> parameters = new ArrayList<>(); + for (Object recoveryClaimMode : recoveryClaimModeParams) { + for (Object dbDirUnderCpDir : dbDirUnderCpDirParams) { + for (Object pathCopying : pathCopyingParams) { + parameters.add(new Object[] {recoveryClaimMode, dbDirUnderCpDir, pathCopying}); + } + } + } + return parameters; } @Parameter public RecoveryClaimMode recoveryClaimMode; @@ -356,12 +390,19 @@ public class DataTransferStrategyTest { @Parameter(1) public Boolean dbDirUnderCpDir; + @Parameter(2) + public Boolean pathCopying; + @TempDir static java.nio.file.Path tempDir; private static final long MAX_TRANSFER_BYTES = Long.MAX_VALUE; private DBFilesContainer createDb( - JobID jobID, int subtaskIndex, int subtaskParallelism, boolean dbDirUnderCpDir) + JobID jobID, + int subtaskIndex, + int subtaskParallelism, + boolean dbDirUnderCpDir, + boolean pathCopying) throws IOException { String dbIdentifier = String.format("%s-db-%d-%d", jobID, subtaskIndex, subtaskParallelism); Path dbLocalBase = new Path(tempDir.toString(), String.format("local/%s", dbIdentifier)); @@ -371,7 +412,7 @@ public class DataTransferStrategyTest { dbDirUnderCpDir ? String.format("checkpoint/%s", dbIdentifier) : String.format("remote/%s", dbIdentifier)); - DBFilesContainer db = new DBFilesContainer(dbLocalBase, dbRemoteBase); + DBFilesContainer db = new DBFilesContainer(dbLocalBase, dbRemoteBase, pathCopying); db.clear(); return db; @@ -390,9 +431,11 @@ public class DataTransferStrategyTest { int subtaskIndex, int subtaskParallelism, boolean dbDirUnderCpDir, - RecoveryClaimMode claimMode) + RecoveryClaimMode claimMode, + boolean pathCopying) throws IOException { - DBFilesContainer db = createDb(jobID, subtaskIndex, subtaskParallelism, dbDirUnderCpDir); + DBFilesContainer db = + createDb(jobID, subtaskIndex, subtaskParallelism, dbDirUnderCpDir, pathCopying); DataTransferStrategy strategy = createDataTransferStrategy(db); return new Tuple2<>(db, strategy); } @@ -492,7 +535,7 @@ public class DataTransferStrategyTest { void simpleCaseTestRestore() throws IOException { JobID jobID = new JobID(); Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy = - createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode); + createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode, pathCopying); DBFilesContainer db = dbAndStrategy.f0; DataTransferStrategy strategy = dbAndStrategy.f1; @@ -527,7 +570,7 @@ public class DataTransferStrategyTest { FileNameGenerator fileNameGenerator = new FileNameGenerator(); JobID jobID = new JobID(); Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy = - createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode); + createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode, pathCopying); DBFilesContainer db = dbAndStrategy.f0; DataTransferStrategy strategy = dbAndStrategy.f1; @@ -550,7 +593,8 @@ public class DataTransferStrategyTest { db.clear(); // restore DB from snapshot - dbAndStrategy = createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode); + dbAndStrategy = + createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode, pathCopying); db = dbAndStrategy.f0; strategy = dbAndStrategy.f1; db.restoreFromSnapshot(strategy, lastSnapshot); @@ -567,4 +611,31 @@ public class DataTransferStrategyTest { db.clear(); db.checkStateHandleFilesExist(lastSnapshot.getStateHandles()); } + + @TestTemplate + void testUncompletedCheckpoint() throws IOException { + FileNameGenerator fileNameGenerator = new FileNameGenerator(); + JobID jobID = new JobID(); + Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy = + createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, recoveryClaimMode, pathCopying); + DBFilesContainer db = dbAndStrategy.f0; + DataTransferStrategy strategy = dbAndStrategy.f1; + + // create new files for DB + List<String> newDbFiles = fileNameGenerator.genMultipleFileNames(4, 4); + db.createDbFiles(newDbFiles); + db.checkDbFilesExist(newDbFiles); + + // create a snapshot + DBFilesSnapshot lastSnapshot = db.snapshot(strategy); + db.assertFilesReusedToCheckpoint(lastSnapshot.getStateHandles()); + + // check the snapshot files exist + lastSnapshot.checkAllFilesExist(); + + // clean the snapshot files + db.tmpResourcesRegistry.close(); + lastSnapshot.checkFilesExist(false, dbDirUnderCpDir); + lastSnapshot.checkFilesExist(true, false); + } }