This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new da38a9810b8 [FLINK-37867][state/forst] Ensure files of half-uploaded 
checkpoints are cleaned when using path copying (#26759)
da38a9810b8 is described below

commit da38a9810b816c59b14cbedf514d13bb47d094a5
Author: AlexYinHan <alexyin...@gmail.com>
AuthorDate: Thu Jul 17 11:21:30 2025 +0800

    [FLINK-37867][state/forst] Ensure files of half-uploaded checkpoints are 
cleaned when using path copying (#26759)
---
 .../datatransfer/CopyDataTransferStrategy.java     |  22 ++-
 .../datatransfer/DataTransferStrategyTest.java     | 169 +++++++++++++++------
 2 files changed, 138 insertions(+), 53 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
index 6686ba41878..9c9d188906b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
@@ -123,7 +123,11 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
 
         // Try path-copying first. If failed, fallback to bytes-copying
         StreamStateHandle targetStateHandle =
-                tryPathCopyingToCheckpoint(sourceStateHandle, 
checkpointStreamFactory, stateScope);
+                tryPathCopyingToCheckpoint(
+                        sourceStateHandle,
+                        checkpointStreamFactory,
+                        stateScope,
+                        tmpResourcesRegistry);
         if (targetStateHandle != null) {
             LOG.trace("Path-copy file to checkpoint: {} {}", dbFilePath, 
targetStateHandle);
         } else {
@@ -148,22 +152,29 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
      * @param sourceHandle The source state handle
      * @param checkpointStreamFactory The checkpoint stream factory
      * @param stateScope The state scope
+     * @param tmpResourcesRegistry The temporary resources registry
      * @return The target state handle if path-copying is successful, 
otherwise null
      */
     private @Nullable StreamStateHandle tryPathCopyingToCheckpoint(
             @Nonnull StreamStateHandle sourceHandle,
             CheckpointStreamFactory checkpointStreamFactory,
-            CheckpointedStateScope stateScope) {
+            CheckpointedStateScope stateScope,
+            CloseableRegistry tmpResourcesRegistry) {
 
         try {
+            // copy the file by duplicating
             if (!checkpointStreamFactory.canFastDuplicate(sourceHandle, 
stateScope)) {
                 return null;
             }
-
             List<StreamStateHandle> result =
                     checkpointStreamFactory.duplicate(
                             Collections.singletonList(sourceHandle), 
stateScope);
-            return result.get(0);
+            StreamStateHandle resultStateHandle = result.get(0);
+
+            // register the clean-up logic of the uploaded file
+            tmpResourcesRegistry.registerCloseable(
+                    () -> 
StateUtil.discardStateObjectQuietly(resultStateHandle));
+            return resultStateHandle;
         } catch (Exception e) {
             LOG.warn("Failed to duplicate file to checkpoint: {} {}", 
sourceHandle, stateScope, e);
         }
@@ -179,6 +190,7 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
             CloseableRegistry closeableRegistry,
             CloseableRegistry tmpResourcesRegistry)
             throws IOException {
+        // copy the file by bytes
         InputStream inputStream = null;
         CheckpointStateOutputStream outputStream = null;
 
@@ -211,6 +223,8 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
             } else {
                 result = null;
             }
+
+            // register the clean-up logic of the uploaded file
             tmpResourcesRegistry.registerCloseable(
                     () -> StateUtil.discardStateObjectQuietly(result));
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
index 459348e49b5..77d80eee012 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
@@ -24,7 +24,9 @@ import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.ICloseableRegistry;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -57,13 +59,12 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -89,14 +90,38 @@ public class DataTransferStrategyTest {
         }
     }
 
+    /** Dummy local file system that 'implements' path copying. */
+    static class TestPathsCopyingLocalFileSystem extends LocalFileSystem
+            implements PathsCopyingFileSystem {
+
+        TestPathsCopyingLocalFileSystem() {
+            super();
+        }
+
+        @Override
+        public void copyFiles(List<CopyRequest> requests, ICloseableRegistry 
closeableRegistry)
+                throws IOException {
+            for (CopyRequest request : requests) {
+                Path source = request.getSource();
+                Path destination = request.getDestination();
+                FileUtils.copy(source, destination, false);
+            }
+        }
+
+        @Override
+        public boolean canCopyPaths(Path source, Path destination) throws 
IOException {
+            return true;
+        }
+    }
+
     /** Container for DB files. */
     static class DBFilesContainer {
         static CheckpointPathsContainer cpPathContainer = null;
 
-        FileSystem realFileSystem;
+        private Random rnd = new Random();
 
+        FileSystem realFileSystem;
         Path dbCheckpointBase;
-
         protected CheckpointStreamFactory checkpointStreamFactory;
 
         protected CloseableRegistry closeableRegistry;
@@ -109,8 +134,12 @@ public class DataTransferStrategyTest {
 
         Map<String, Path> dbFilePaths = new HashMap<>();
 
-        DBFilesContainer(Path dbLocalBase, Path dbRemoteBase) throws 
IOException {
-            realFileSystem = LocalFileSystem.getLocalFileSystem();
+        DBFilesContainer(Path dbLocalBase, Path dbRemoteBase, boolean 
pathCopying)
+                throws IOException {
+            realFileSystem =
+                    pathCopying
+                            ? new TestPathsCopyingLocalFileSystem()
+                            : LocalFileSystem.getLocalFileSystem();
 
             // prepare db paths
             this.dbDelegateFileSystem = 
ForStFlinkFileSystem.get(dbRemoteBase.toUri());
@@ -133,6 +162,12 @@ public class DataTransferStrategyTest {
             closeableRegistry = new CloseableRegistry();
         }
 
+        private byte[] genRandomBytes(int length) {
+            byte[] b = new byte[length];
+            rnd.nextBytes(b);
+            return b;
+        }
+
         private void createDbFiles(List<String> fileNames) throws IOException {
             for (String fileName : fileNames) {
                 Path dir =
@@ -142,7 +177,7 @@ public class DataTransferStrategyTest {
                 FSDataOutputStream output =
                         dbDelegateFileSystem.create(
                                 new Path(dir, fileName), 
FileSystem.WriteMode.OVERWRITE);
-                output.write(fileName.getBytes(StandardCharsets.UTF_8));
+                output.write(genRandomBytes(2048));
                 output.sync();
                 output.close();
                 dbFilePaths.put(fileName, new Path(dir, fileName));
@@ -230,6 +265,8 @@ public class DataTransferStrategyTest {
                                 closeableRegistry,
                                 tmpResourcesRegistry);
                 snapshot.add(fileName, dbFilePath, handleAndLocalPath);
+                checkpointStreamFactory.canFastDuplicate(
+                        handleAndLocalPath.getHandle(), 
CheckpointedStateScope.SHARED);
             }
             return snapshot;
         }
@@ -280,37 +317,29 @@ public class DataTransferStrategyTest {
             return handles;
         }
 
-        List<Path> getFilePaths() {
-            List<Path> filePaths = new ArrayList<>();
-            dbSnapshotFiles
-                    .values()
-                    .forEach(
-                            tuple -> {
-                                HandleAndLocalPath handleAndLocalPath = 
tuple.f1;
-                                if (handleAndLocalPath.getHandle() instanceof 
FileStateHandle) {
-                                    Path filePath =
-                                            ((FileStateHandle) 
handleAndLocalPath.getHandle())
-                                                    .getFilePath();
-                                    filePaths.add(filePath);
-                                }
-                            });
-            return filePaths;
-        }
-
         List<String> getDbFiles() {
             return new ArrayList<>(dbSnapshotFiles.keySet());
         }
 
-        void checkAllFilesExist() {
-            getFilePaths()
-                    .forEach(
-                            path -> {
-                                try {
-                                    
assertThat(path.getFileSystem().exists(path)).isTrue();
-                                } catch (IOException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            });
+        void checkAllFilesExist() throws IOException {
+            checkFilesExist(true, true);
+            checkFilesExist(false, true);
+        }
+
+        // check whether the snapshots for local/remote files exist
+        void checkFilesExist(boolean shouldBeLocalFile, boolean shouldExist) 
throws IOException {
+            for (Tuple2<Path, HandleAndLocalPath> tuple : 
dbSnapshotFiles.values()) {
+                Path dbFilePath = tuple.f0;
+                StreamStateHandle handle = tuple.f1.getHandle();
+                if (!(handle instanceof FileStateHandle)
+                        || FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath)
+                                != shouldBeLocalFile) {
+                    continue;
+                }
+                Path realFilePath = ((FileStateHandle) handle).getFilePath();
+                boolean exist = 
realFilePath.getFileSystem().exists(realFilePath);
+                assertThat(exist).isEqualTo(shouldExist);
+            }
         }
     }
 
@@ -340,15 +369,20 @@ public class DataTransferStrategyTest {
         }
     }
 
-    @Parameters(name = " recoveryClaimMode = {0}, dbDirUnderCpDir = {1}")
+    @Parameters(name = " recoveryClaimMode = {0}, dbDirUnderCpDir = {1}, 
pathCopying = {2}")
     public static List<Object[]> parameters() {
-        return Arrays.asList(
-                new Object[][] {
-                    {RecoveryClaimMode.NO_CLAIM, false},
-                    {RecoveryClaimMode.NO_CLAIM, true},
-                    {RecoveryClaimMode.CLAIM, false},
-                    {RecoveryClaimMode.CLAIM, true},
-                });
+        Object[] recoveryClaimModeParams = {RecoveryClaimMode.NO_CLAIM, 
RecoveryClaimMode.CLAIM};
+        Object[] dbDirUnderCpDirParams = {false, true};
+        Object[] pathCopyingParams = {false, true};
+        List<Object[]> parameters = new ArrayList<>();
+        for (Object recoveryClaimMode : recoveryClaimModeParams) {
+            for (Object dbDirUnderCpDir : dbDirUnderCpDirParams) {
+                for (Object pathCopying : pathCopyingParams) {
+                    parameters.add(new Object[] {recoveryClaimMode, 
dbDirUnderCpDir, pathCopying});
+                }
+            }
+        }
+        return parameters;
     }
 
     @Parameter public RecoveryClaimMode recoveryClaimMode;
@@ -356,12 +390,19 @@ public class DataTransferStrategyTest {
     @Parameter(1)
     public Boolean dbDirUnderCpDir;
 
+    @Parameter(2)
+    public Boolean pathCopying;
+
     @TempDir static java.nio.file.Path tempDir;
 
     private static final long MAX_TRANSFER_BYTES = Long.MAX_VALUE;
 
     private DBFilesContainer createDb(
-            JobID jobID, int subtaskIndex, int subtaskParallelism, boolean 
dbDirUnderCpDir)
+            JobID jobID,
+            int subtaskIndex,
+            int subtaskParallelism,
+            boolean dbDirUnderCpDir,
+            boolean pathCopying)
             throws IOException {
         String dbIdentifier = String.format("%s-db-%d-%d", jobID, 
subtaskIndex, subtaskParallelism);
         Path dbLocalBase = new Path(tempDir.toString(), 
String.format("local/%s", dbIdentifier));
@@ -371,7 +412,7 @@ public class DataTransferStrategyTest {
                         dbDirUnderCpDir
                                 ? String.format("checkpoint/%s", dbIdentifier)
                                 : String.format("remote/%s", dbIdentifier));
-        DBFilesContainer db = new DBFilesContainer(dbLocalBase, dbRemoteBase);
+        DBFilesContainer db = new DBFilesContainer(dbLocalBase, dbRemoteBase, 
pathCopying);
         db.clear();
 
         return db;
@@ -390,9 +431,11 @@ public class DataTransferStrategyTest {
             int subtaskIndex,
             int subtaskParallelism,
             boolean dbDirUnderCpDir,
-            RecoveryClaimMode claimMode)
+            RecoveryClaimMode claimMode,
+            boolean pathCopying)
             throws IOException {
-        DBFilesContainer db = createDb(jobID, subtaskIndex, 
subtaskParallelism, dbDirUnderCpDir);
+        DBFilesContainer db =
+                createDb(jobID, subtaskIndex, subtaskParallelism, 
dbDirUnderCpDir, pathCopying);
         DataTransferStrategy strategy = createDataTransferStrategy(db);
         return new Tuple2<>(db, strategy);
     }
@@ -492,7 +535,7 @@ public class DataTransferStrategyTest {
     void simpleCaseTestRestore() throws IOException {
         JobID jobID = new JobID();
         Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy =
-                createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode);
+                createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode, pathCopying);
         DBFilesContainer db = dbAndStrategy.f0;
         DataTransferStrategy strategy = dbAndStrategy.f1;
 
@@ -527,7 +570,7 @@ public class DataTransferStrategyTest {
         FileNameGenerator fileNameGenerator = new FileNameGenerator();
         JobID jobID = new JobID();
         Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy =
-                createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode);
+                createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode, pathCopying);
         DBFilesContainer db = dbAndStrategy.f0;
         DataTransferStrategy strategy = dbAndStrategy.f1;
 
@@ -550,7 +593,8 @@ public class DataTransferStrategyTest {
             db.clear();
 
             // restore DB from snapshot
-            dbAndStrategy = createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode);
+            dbAndStrategy =
+                    createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode, pathCopying);
             db = dbAndStrategy.f0;
             strategy = dbAndStrategy.f1;
             db.restoreFromSnapshot(strategy, lastSnapshot);
@@ -567,4 +611,31 @@ public class DataTransferStrategyTest {
         db.clear();
         db.checkStateHandleFilesExist(lastSnapshot.getStateHandles());
     }
+
+    @TestTemplate
+    void testUncompletedCheckpoint() throws IOException {
+        FileNameGenerator fileNameGenerator = new FileNameGenerator();
+        JobID jobID = new JobID();
+        Tuple2<DBFilesContainer, DataTransferStrategy> dbAndStrategy =
+                createOrRestoreDb(jobID, 0, 1, dbDirUnderCpDir, 
recoveryClaimMode, pathCopying);
+        DBFilesContainer db = dbAndStrategy.f0;
+        DataTransferStrategy strategy = dbAndStrategy.f1;
+
+        // create new files for DB
+        List<String> newDbFiles = fileNameGenerator.genMultipleFileNames(4, 4);
+        db.createDbFiles(newDbFiles);
+        db.checkDbFilesExist(newDbFiles);
+
+        // create a snapshot
+        DBFilesSnapshot lastSnapshot = db.snapshot(strategy);
+        db.assertFilesReusedToCheckpoint(lastSnapshot.getStateHandles());
+
+        // check the snapshot files exist
+        lastSnapshot.checkAllFilesExist();
+
+        // clean the snapshot files
+        db.tmpResourcesRegistry.close();
+        lastSnapshot.checkFilesExist(false, dbDirUnderCpDir);
+        lastSnapshot.checkFilesExist(true, false);
+    }
 }

Reply via email to