This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 841f23c73e4 [FLINK-32074][checkpoint] Merge file across checkpoints
841f23c73e4 is described below

commit 841f23c73e4399df91112dd11ddca74f45ea5b37
Author: Zakelly <[email protected]>
AuthorDate: Thu Mar 14 15:26:55 2024 +0800

    [FLINK-32074][checkpoint] Merge file across checkpoints
---
 ...AcrossCheckpointFileMergingSnapshotManager.java |  66 ++++++++
 .../FileMergingSnapshotManagerBuilder.java         |  30 +++-
 .../checkpoint/filemerging/FileMergingType.java    |  26 +++
 .../state/TaskExecutorFileMergingManager.java      |   5 +-
 ...ssCheckpointFileMergingSnapshotManagerTest.java | 171 ++++++++++++++++++++
 ...ava => FileMergingSnapshotManagerTestBase.java} | 169 ++------------------
 ...inCheckpointFileMergingSnapshotManagerTest.java | 176 +++++++++++++++++++++
 .../FsMergingCheckpointStorageLocationTest.java    |   5 +-
 8 files changed, 487 insertions(+), 161 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
new file mode 100644
index 00000000000..941cb98a1c5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files across 
checkpoints. */
+public class AcrossCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+    private final PhysicalFilePool filePool;
+
+    public AcrossCheckpointFileMergingSnapshotManager(
+            String id, long maxFileSize, PhysicalFilePool.Type filePoolType, 
Executor ioExecutor) {
+        super(id, maxFileSize, filePoolType, ioExecutor);
+        filePool = createPhysicalPool();
+    }
+
+    @Override
+    @Nonnull
+    protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointID, CheckpointedStateScope 
scope)
+            throws IOException {
+        return filePool.pollFile(subtaskKey, scope);
+    }
+
+    @Override
+    protected void discardCheckpoint(long checkpointId) {}
+
+    @Override
+    protected void returnPhysicalFileForNextReuse(
+            SubtaskKey subtaskKey, long checkpointId, PhysicalFile 
physicalFile)
+            throws IOException {
+
+        if (shouldSyncAfterClosingLogicalFile) {
+            FSDataOutputStream os = physicalFile.getOutputStream();
+            if (os != null) {
+                os.sync();
+            }
+        }
+
+        if (!filePool.tryPutFile(subtaskKey, physicalFile)) {
+            physicalFile.close();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
index ca1d5b67501..3a99133bf20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
@@ -29,6 +29,9 @@ public class FileMergingSnapshotManagerBuilder {
     /** The id for identifying a {@link FileMergingSnapshotManager}. */
     private final String id;
 
+    /** The file merging type. */
+    private final FileMergingType fileMergingType;
+
     /** Max size for a file. TODO: Make it configurable. */
     private long maxFileSize = 32 * 1024 * 1024;
 
@@ -42,8 +45,9 @@ public class FileMergingSnapshotManagerBuilder {
      *
      * @param id the id of the manager.
      */
-    public FileMergingSnapshotManagerBuilder(String id) {
+    public FileMergingSnapshotManagerBuilder(String id, FileMergingType type) {
         this.id = id;
+        this.fileMergingType = type;
     }
 
     /** Set the max file size. */
@@ -71,13 +75,27 @@ public class FileMergingSnapshotManagerBuilder {
     /**
      * Create file-merging snapshot manager based on configuration.
      *
-     * <p>TODO (FLINK-32074): Support another type of 
FileMergingSnapshotManager that merges files
-     * across different checkpoints.
-     *
      * @return the created manager.
      */
     public FileMergingSnapshotManager build() {
-        return new WithinCheckpointFileMergingSnapshotManager(
-                id, maxFileSize, filePoolType, ioExecutor == null ? 
Runnable::run : ioExecutor);
+        switch (fileMergingType) {
+            case MERGE_WITHIN_CHECKPOINT:
+                return new WithinCheckpointFileMergingSnapshotManager(
+                        id,
+                        maxFileSize,
+                        filePoolType,
+                        ioExecutor == null ? Runnable::run : ioExecutor);
+            case MERGE_ACROSS_CHECKPOINT:
+                return new AcrossCheckpointFileMergingSnapshotManager(
+                        id,
+                        maxFileSize,
+                        filePoolType,
+                        ioExecutor == null ? Runnable::run : ioExecutor);
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported type %s when creating file 
merging manager",
+                                fileMergingType));
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java
new file mode 100644
index 00000000000..f9fbc460b88
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+/** How the checkpoint files can be segmented. */
+public enum FileMergingType {
+    // merge checkpoint files within checkpoint boundaries
+    MERGE_WITHIN_CHECKPOINT,
+    // merge checkpoint files across checkpoint boundaries
+    MERGE_ACROSS_CHECKPOINT
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
index 42d8a4a5469..2b48d8a07e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
@@ -78,7 +79,9 @@ public class TaskExecutorFileMergingManager {
             if (fileMergingSnapshotManager == null) {
                 // TODO FLINK-32440: choose different 
FileMergingSnapshotManager by configuration
                 fileMergingSnapshotManager =
-                        new 
FileMergingSnapshotManagerBuilder(jobId.toString()).build();
+                        new FileMergingSnapshotManagerBuilder(
+                                        jobId.toString(), 
FileMergingType.MERGE_WITHIN_CHECKPOINT)
+                                .build();
                 fileMergingSnapshotManagerByJobId.put(jobId, 
fileMergingSnapshotManager);
                 LOG.info("Registered new file merging snapshot manager for job 
{}.", jobId);
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
new file mode 100644
index 00000000000..475c13b5445
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AcrossCheckpointFileMergingSnapshotManager}. */
+public class AcrossCheckpointFileMergingSnapshotManagerTest
+        extends FileMergingSnapshotManagerTestBase {
+    @Override
+    FileMergingType getFileMergingType() {
+        return FileMergingType.MERGE_ACROSS_CHECKPOINT;
+    }
+
+    @Test
+    void testCreateAndReuseFiles() throws IOException {
+        try (FileMergingSnapshotManagerBase fmsm =
+                (FileMergingSnapshotManagerBase)
+                        createFileMergingSnapshotManager(checkpointBaseDir)) {
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
+            fmsm.registerSubtaskForSharedStates(subtaskKey2);
+            // firstly, we try shared state.
+            PhysicalFile file1 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file1.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            // allocate another
+            PhysicalFile file2 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file2.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file2).isNotEqualTo(file1);
+
+            // return for reuse
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
+
+            // allocate for another subtask
+            PhysicalFile file3 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey2, 0, CheckpointedStateScope.SHARED);
+            assertThat(file3.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.SHARED));
+            assertThat(file3).isNotEqualTo(file1);
+
+            // allocate for another checkpoint
+            PhysicalFile file4 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.SHARED);
+            assertThat(file4.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file4).isEqualTo(file1);
+
+            // a physical file whose size is bigger than maxPhysicalFileSize 
cannot be reused
+            file4.incSize(fmsm.maxPhysicalFileSize);
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 1, file4);
+            PhysicalFile file5 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.SHARED);
+            assertThat(file5.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file5).isNotEqualTo(file4);
+
+            // Secondly, we try private state
+            PhysicalFile file6 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file6.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+
+            // allocate another
+            PhysicalFile file7 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file7.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file7).isNotEqualTo(file5);
+
+            // return for reuse
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6);
+
+            // allocate for another checkpoint
+            PhysicalFile file8 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 2, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file8.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file8).isEqualTo(file6);
+
+            // return for reuse
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file8);
+
+            // allocate for this checkpoint but another subtask
+            PhysicalFile file9 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey2, 2, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file9.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file9).isEqualTo(file6);
+
+            // a physical file whose size is bigger than maxPhysicalFileSize 
cannot be reused
+            file9.incSize(fmsm.maxPhysicalFileSize);
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 2, file9);
+            PhysicalFile file10 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 2, CheckpointedStateScope.SHARED);
+            assertThat(file10.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file10).isNotEqualTo(file9);
+
+            assertThat(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE))
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+        }
+    }
+
+    @Test
+    public void testCheckpointNotification() throws Exception {
+        try (FileMergingSnapshotManager fmsm = 
createFileMergingSnapshotManager(checkpointBaseDir);
+                CloseableRegistry closeableRegistry = new CloseableRegistry()) 
{
+            FileMergingCheckpointStateOutputStream cp1Stream =
+                    writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp1StateHandle = 
cp1Stream.closeAndGetHandle();
+            fmsm.notifyCheckpointComplete(subtaskKey1, 1);
+            assertFileInManagedDir(fmsm, cp1StateHandle);
+
+            // complete checkpoint-2
+            FileMergingCheckpointStateOutputStream cp2Stream =
+                    writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp2StateHandle = 
cp2Stream.closeAndGetHandle();
+            fmsm.notifyCheckpointComplete(subtaskKey1, 2);
+            assertFileInManagedDir(fmsm, cp2StateHandle);
+
+            // subsume checkpoint-1
+            assertThat(fileExists(cp1StateHandle)).isTrue();
+            fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
+            assertThat(fileExists(cp1StateHandle)).isTrue();
+
+            // abort checkpoint-3
+            FileMergingCheckpointStateOutputStream cp3Stream =
+                    writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp3StateHandle = 
cp3Stream.closeAndGetHandle();
+            assertFileInManagedDir(fmsm, cp3StateHandle);
+            fmsm.notifyCheckpointAborted(subtaskKey1, 3);
+            assertThat(fileExists(cp3StateHandle)).isTrue();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
similarity index 67%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
index be47d272900..d6553060e2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
@@ -43,18 +43,20 @@ import java.util.concurrent.Future;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FileMergingSnapshotManager}. */
-public class FileMergingSnapshotManagerTest {
+public abstract class FileMergingSnapshotManagerTestBase {
 
-    private final String tmId = "Testing";
+    final String tmId = "Testing";
 
-    private final OperatorID operatorID = new OperatorID(289347923L, 
75893479L);
+    final OperatorID operatorID = new OperatorID(289347923L, 75893479L);
 
-    private SubtaskKey subtaskKey1;
-    private SubtaskKey subtaskKey2;
+    SubtaskKey subtaskKey1;
+    SubtaskKey subtaskKey2;
 
-    private Path checkpointBaseDir;
+    Path checkpointBaseDir;
 
-    private int writeBufferSize;
+    int writeBufferSize;
+
+    abstract FileMergingType getFileMergingType();
 
     @BeforeEach
     public void setup(@TempDir java.nio.file.Path tempFolder) {
@@ -90,113 +92,6 @@ public class FileMergingSnapshotManagerTest {
         }
     }
 
-    @Test
-    void testCreateAndReuseFiles() throws IOException {
-        try (FileMergingSnapshotManagerBase fmsm =
-                (FileMergingSnapshotManagerBase)
-                        createFileMergingSnapshotManager(checkpointBaseDir)) {
-            fmsm.registerSubtaskForSharedStates(subtaskKey1);
-            fmsm.registerSubtaskForSharedStates(subtaskKey2);
-            // firstly, we try shared state.
-            PhysicalFile file1 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
-            assertThat(file1.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
-            // allocate another
-            PhysicalFile file2 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
-            assertThat(file2.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
-            assertThat(file2).isNotEqualTo(file1);
-
-            // return for reuse
-            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
-
-            // allocate for another subtask
-            PhysicalFile file3 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey2, 0, CheckpointedStateScope.SHARED);
-            assertThat(file3.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.SHARED));
-            assertThat(file3).isNotEqualTo(file1);
-
-            // allocate for another checkpoint
-            PhysicalFile file4 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 1, CheckpointedStateScope.SHARED);
-            assertThat(file4.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
-            assertThat(file4).isNotEqualTo(file1);
-
-            // allocate for this checkpoint
-            PhysicalFile file5 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
-            assertThat(file5.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
-            assertThat(file5).isEqualTo(file1);
-
-            // a physical file whose size is bigger than maxPhysicalFileSize 
cannot be reused
-            file5.incSize(fmsm.maxPhysicalFileSize);
-            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5);
-            PhysicalFile file6 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
-            assertThat(file6.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
-            assertThat(file6).isNotEqualTo(file5);
-
-            // Secondly, we try private state
-            PhysicalFile file7 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
-            assertThat(file7.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
-
-            // allocate another
-            PhysicalFile file8 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
-            assertThat(file8.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
-            assertThat(file8).isNotEqualTo(file6);
-
-            // return for reuse
-            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7);
-
-            // allocate for another checkpoint
-            PhysicalFile file9 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
-            assertThat(file9.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
-            assertThat(file9).isNotEqualTo(file7);
-
-            // allocate for this checkpoint but another subtask
-            PhysicalFile file10 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE);
-            assertThat(file10.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE));
-            assertThat(file10).isEqualTo(file7);
-
-            // a physical file whose size is bigger than maxPhysicalFileSize 
cannot be reused
-            file10.incSize(fmsm.maxPhysicalFileSize);
-            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10);
-            PhysicalFile file11 =
-                    fmsm.getOrCreatePhysicalFileForCheckpoint(
-                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
-            assertThat(file11.getFilePath().getParent())
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
-            assertThat(file11).isNotEqualTo(file10);
-
-            assertThat(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE))
-                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
-        }
-    }
-
     @Test
     void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException {
         try (FileMergingSnapshotManagerBase fmsm =
@@ -378,38 +273,6 @@ public class FileMergingSnapshotManagerTest {
         }
     }
 
-    @Test
-    public void testCheckpointNotification() throws Exception {
-        try (FileMergingSnapshotManager fmsm = 
createFileMergingSnapshotManager(checkpointBaseDir);
-                CloseableRegistry closeableRegistry = new CloseableRegistry()) 
{
-            FileMergingCheckpointStateOutputStream cp1Stream =
-                    writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
-            SegmentFileStateHandle cp1StateHandle = 
cp1Stream.closeAndGetHandle();
-            fmsm.notifyCheckpointComplete(subtaskKey1, 1);
-            assertFileInManagedDir(fmsm, cp1StateHandle);
-
-            // complete checkpoint-2
-            FileMergingCheckpointStateOutputStream cp2Stream =
-                    writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
-            SegmentFileStateHandle cp2StateHandle = 
cp2Stream.closeAndGetHandle();
-            fmsm.notifyCheckpointComplete(subtaskKey1, 2);
-            assertFileInManagedDir(fmsm, cp2StateHandle);
-
-            // subsume checkpoint-1
-            assertThat(fileExists(cp1StateHandle)).isTrue();
-            fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
-            assertThat(fileExists(cp1StateHandle)).isFalse();
-
-            // abort checkpoint-3
-            FileMergingCheckpointStateOutputStream cp3Stream =
-                    writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
-            SegmentFileStateHandle cp3StateHandle = 
cp3Stream.closeAndGetHandle();
-            assertFileInManagedDir(fmsm, cp3StateHandle);
-            fmsm.notifyCheckpointAborted(subtaskKey1, 3);
-            assertThat(fileExists(cp3StateHandle)).isFalse();
-        }
-    }
-
     @Test
     public void testConcurrentFileReusingWithBlockingPool() throws Exception {
         try (FileMergingSnapshotManagerBase fmsm =
@@ -455,13 +318,13 @@ public class FileMergingSnapshotManagerTest {
         }
     }
 
-    private FileMergingSnapshotManager createFileMergingSnapshotManager(Path 
checkpointBaseDir)
+    FileMergingSnapshotManager createFileMergingSnapshotManager(Path 
checkpointBaseDir)
             throws IOException {
         return createFileMergingSnapshotManager(
                 checkpointBaseDir, 32 * 1024 * 1024, 
PhysicalFilePool.Type.NON_BLOCKING);
     }
 
-    private FileMergingSnapshotManager createFileMergingSnapshotManager(
+    FileMergingSnapshotManager createFileMergingSnapshotManager(
             Path checkpointBaseDir, long maxFileSize, PhysicalFilePool.Type 
filePoolType)
             throws IOException {
         FileSystem fs = LocalFileSystem.getSharedInstance();
@@ -479,7 +342,7 @@ public class FileMergingSnapshotManagerTest {
             fs.mkdirs(taskOwnedStateDir);
         }
         FileMergingSnapshotManager fmsm =
-                new FileMergingSnapshotManagerBuilder(tmId)
+                new FileMergingSnapshotManagerBuilder(tmId, 
getFileMergingType())
                         .setMaxFileSize(maxFileSize)
                         .setFilePoolType(filePoolType)
                         .build();
@@ -493,13 +356,13 @@ public class FileMergingSnapshotManagerTest {
         return fmsm;
     }
 
-    private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+    FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
             long checkpointId, FileMergingSnapshotManager fmsm, 
CloseableRegistry closeableRegistry)
             throws IOException {
         return writeCheckpointAndGetStream(checkpointId, fmsm, 
closeableRegistry, 32);
     }
 
-    private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+    FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
             long checkpointId,
             FileMergingSnapshotManager fmsm,
             CloseableRegistry closeableRegistry,
@@ -515,7 +378,7 @@ public class FileMergingSnapshotManagerTest {
         return stream;
     }
 
-    private void assertFileInManagedDir(
+    void assertFileInManagedDir(
             FileMergingSnapshotManager fmsm, SegmentFileStateHandle 
stateHandle) {
         assertThat(fmsm instanceof FileMergingSnapshotManagerBase).isTrue();
         assertThat(stateHandle).isNotNull();
@@ -524,7 +387,7 @@ public class FileMergingSnapshotManagerTest {
         assertThat(((FileMergingSnapshotManagerBase) 
fmsm).isResponsibleForFile(filePath)).isTrue();
     }
 
-    private boolean fileExists(SegmentFileStateHandle stateHandle) throws 
IOException {
+    boolean fileExists(SegmentFileStateHandle stateHandle) throws IOException {
         assertThat(stateHandle).isNotNull();
         Path filePath = stateHandle.getFilePath();
         assertThat(filePath).isNotNull();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
new file mode 100644
index 00000000000..4128e5e7cd3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link WithinCheckpointFileMergingSnapshotManager}. */
+public class WithinCheckpointFileMergingSnapshotManagerTest
+        extends FileMergingSnapshotManagerTestBase {
+    @Override
+    FileMergingType getFileMergingType() {
+        return FileMergingType.MERGE_WITHIN_CHECKPOINT;
+    }
+
+    @Test
+    void testCreateAndReuseFiles() throws IOException {
+        try (FileMergingSnapshotManagerBase fmsm =
+                (FileMergingSnapshotManagerBase)
+                        createFileMergingSnapshotManager(checkpointBaseDir)) {
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
+            fmsm.registerSubtaskForSharedStates(subtaskKey2);
+            // firstly, we try shared state.
+            PhysicalFile file1 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file1.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            // allocate another
+            PhysicalFile file2 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file2.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file2).isNotEqualTo(file1);
+
+            // return for reuse
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
+
+            // allocate for another subtask
+            PhysicalFile file3 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey2, 0, CheckpointedStateScope.SHARED);
+            assertThat(file3.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.SHARED));
+            assertThat(file3).isNotEqualTo(file1);
+
+            // allocate for another checkpoint
+            PhysicalFile file4 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.SHARED);
+            assertThat(file4.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file4).isNotEqualTo(file1);
+
+            // allocate for this checkpoint
+            PhysicalFile file5 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file5.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file5).isEqualTo(file1);
+
+            // a physical file whose size is bigger than maxPhysicalFileSize 
cannot be reused
+            file5.incSize(fmsm.maxPhysicalFileSize);
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5);
+            PhysicalFile file6 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file6.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file6).isNotEqualTo(file5);
+
+            // Secondly, we try private state
+            PhysicalFile file7 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file7.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+
+            // allocate another
+            PhysicalFile file8 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file8.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file8).isNotEqualTo(file6);
+
+            // return for reuse
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7);
+
+            // allocate for another checkpoint
+            PhysicalFile file9 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file9.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file9).isNotEqualTo(file7);
+
+            // allocate for this checkpoint but another subtask
+            PhysicalFile file10 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file10.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file10).isEqualTo(file7);
+
+            // a physical file whose size is bigger than maxPhysicalFileSize 
cannot be reused
+            file10.incSize(fmsm.maxPhysicalFileSize);
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10);
+            PhysicalFile file11 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file11.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file11).isNotEqualTo(file10);
+
+            assertThat(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE))
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+        }
+    }
+
+    @Test
+    public void testCheckpointNotification() throws Exception {
+        try (FileMergingSnapshotManager fmsm = 
createFileMergingSnapshotManager(checkpointBaseDir);
+                CloseableRegistry closeableRegistry = new CloseableRegistry()) 
{
+            FileMergingCheckpointStateOutputStream cp1Stream =
+                    writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp1StateHandle = 
cp1Stream.closeAndGetHandle();
+            fmsm.notifyCheckpointComplete(subtaskKey1, 1);
+            assertFileInManagedDir(fmsm, cp1StateHandle);
+
+            // complete checkpoint-2
+            FileMergingCheckpointStateOutputStream cp2Stream =
+                    writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp2StateHandle = 
cp2Stream.closeAndGetHandle();
+            fmsm.notifyCheckpointComplete(subtaskKey1, 2);
+            assertFileInManagedDir(fmsm, cp2StateHandle);
+
+            // subsume checkpoint-1
+            assertThat(fileExists(cp1StateHandle)).isTrue();
+            fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
+            assertThat(fileExists(cp1StateHandle)).isFalse();
+
+            // abort checkpoint-3
+            FileMergingCheckpointStateOutputStream cp3Stream =
+                    writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp3StateHandle = 
cp3Stream.closeAndGetHandle();
+            assertFileInManagedDir(fmsm, cp3StateHandle);
+            fmsm.notifyCheckpointAborted(subtaskKey1, 3);
+            assertThat(fileExists(cp3StateHandle)).isFalse();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
index 37fbaa98f09..d7796cf7f4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
 import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -220,7 +221,9 @@ public class FsMergingCheckpointStorageLocationTest {
 
     private FileMergingSnapshotManager createFileMergingSnapshotManager(long 
maxFileSize) {
         FileMergingSnapshotManager mgr =
-                new FileMergingSnapshotManagerBuilder(SNAPSHOT_MGR_ID).build();
+                new FileMergingSnapshotManagerBuilder(
+                                SNAPSHOT_MGR_ID, 
FileMergingType.MERGE_WITHIN_CHECKPOINT)
+                        .build();
 
         mgr.initFileSystem(
                 getSharedInstance(),


Reply via email to