This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 841f23c73e4 [FLINK-32074][checkpoint] Merge file across checkpoints
841f23c73e4 is described below
commit 841f23c73e4399df91112dd11ddca74f45ea5b37
Author: Zakelly <[email protected]>
AuthorDate: Thu Mar 14 15:26:55 2024 +0800
[FLINK-32074][checkpoint] Merge file across checkpoints
---
...AcrossCheckpointFileMergingSnapshotManager.java | 66 ++++++++
.../FileMergingSnapshotManagerBuilder.java | 30 +++-
.../checkpoint/filemerging/FileMergingType.java | 26 +++
.../state/TaskExecutorFileMergingManager.java | 5 +-
...ssCheckpointFileMergingSnapshotManagerTest.java | 171 ++++++++++++++++++++
...ava => FileMergingSnapshotManagerTestBase.java} | 169 ++------------------
...inCheckpointFileMergingSnapshotManagerTest.java | 176 +++++++++++++++++++++
.../FsMergingCheckpointStorageLocationTest.java | 5 +-
8 files changed, 487 insertions(+), 161 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
new file mode 100644
index 00000000000..941cb98a1c5
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files across
checkpoints. */
+public class AcrossCheckpointFileMergingSnapshotManager extends
FileMergingSnapshotManagerBase {
+
+ private final PhysicalFilePool filePool;
+
+ public AcrossCheckpointFileMergingSnapshotManager(
+ String id, long maxFileSize, PhysicalFilePool.Type filePoolType,
Executor ioExecutor) {
+ super(id, maxFileSize, filePoolType, ioExecutor);
+ filePool = createPhysicalPool();
+ }
+
+ @Override
+ @Nonnull
+ protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+ SubtaskKey subtaskKey, long checkpointID, CheckpointedStateScope
scope)
+ throws IOException {
+ return filePool.pollFile(subtaskKey, scope);
+ }
+
+ @Override
+ protected void discardCheckpoint(long checkpointId) {}
+
+ @Override
+ protected void returnPhysicalFileForNextReuse(
+ SubtaskKey subtaskKey, long checkpointId, PhysicalFile
physicalFile)
+ throws IOException {
+
+ if (shouldSyncAfterClosingLogicalFile) {
+ FSDataOutputStream os = physicalFile.getOutputStream();
+ if (os != null) {
+ os.sync();
+ }
+ }
+
+ if (!filePool.tryPutFile(subtaskKey, physicalFile)) {
+ physicalFile.close();
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
index ca1d5b67501..3a99133bf20 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
@@ -29,6 +29,9 @@ public class FileMergingSnapshotManagerBuilder {
/** The id for identifying a {@link FileMergingSnapshotManager}. */
private final String id;
+ /** The file merging type. */
+ private final FileMergingType fileMergingType;
+
/** Max size for a file. TODO: Make it configurable. */
private long maxFileSize = 32 * 1024 * 1024;
@@ -42,8 +45,9 @@ public class FileMergingSnapshotManagerBuilder {
*
* @param id the id of the manager.
*/
- public FileMergingSnapshotManagerBuilder(String id) {
+ public FileMergingSnapshotManagerBuilder(String id, FileMergingType type) {
this.id = id;
+ this.fileMergingType = type;
}
/** Set the max file size. */
@@ -71,13 +75,27 @@ public class FileMergingSnapshotManagerBuilder {
/**
* Create file-merging snapshot manager based on configuration.
*
- * <p>TODO (FLINK-32074): Support another type of
FileMergingSnapshotManager that merges files
- * across different checkpoints.
- *
* @return the created manager.
*/
public FileMergingSnapshotManager build() {
- return new WithinCheckpointFileMergingSnapshotManager(
- id, maxFileSize, filePoolType, ioExecutor == null ?
Runnable::run : ioExecutor);
+ switch (fileMergingType) {
+ case MERGE_WITHIN_CHECKPOINT:
+ return new WithinCheckpointFileMergingSnapshotManager(
+ id,
+ maxFileSize,
+ filePoolType,
+ ioExecutor == null ? Runnable::run : ioExecutor);
+ case MERGE_ACROSS_CHECKPOINT:
+ return new AcrossCheckpointFileMergingSnapshotManager(
+ id,
+ maxFileSize,
+ filePoolType,
+ ioExecutor == null ? Runnable::run : ioExecutor);
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported type %s when creating file
merging manager",
+ fileMergingType));
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java
new file mode 100644
index 00000000000..f9fbc460b88
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+/** How the checkpoint files can be segmented. */
+public enum FileMergingType {
+ // merge checkpoint files within checkpoint boundaries
+ MERGE_WITHIN_CHECKPOINT,
+ // merge checkpoint files across checkpoint boundaries
+ MERGE_ACROSS_CHECKPOINT
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
index 42d8a4a5469..2b48d8a07e7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
@@ -78,7 +79,9 @@ public class TaskExecutorFileMergingManager {
if (fileMergingSnapshotManager == null) {
// TODO FLINK-32440: choose different
FileMergingSnapshotManager by configuration
fileMergingSnapshotManager =
- new
FileMergingSnapshotManagerBuilder(jobId.toString()).build();
+ new FileMergingSnapshotManagerBuilder(
+ jobId.toString(),
FileMergingType.MERGE_WITHIN_CHECKPOINT)
+ .build();
fileMergingSnapshotManagerByJobId.put(jobId,
fileMergingSnapshotManager);
LOG.info("Registered new file merging snapshot manager for job
{}.", jobId);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
new file mode 100644
index 00000000000..475c13b5445
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AcrossCheckpointFileMergingSnapshotManager}. */
+public class AcrossCheckpointFileMergingSnapshotManagerTest
+ extends FileMergingSnapshotManagerTestBase {
+ @Override
+ FileMergingType getFileMergingType() {
+ return FileMergingType.MERGE_ACROSS_CHECKPOINT;
+ }
+
+ @Test
+ void testCreateAndReuseFiles() throws IOException {
+ try (FileMergingSnapshotManagerBase fmsm =
+ (FileMergingSnapshotManagerBase)
+ createFileMergingSnapshotManager(checkpointBaseDir)) {
+ fmsm.registerSubtaskForSharedStates(subtaskKey1);
+ fmsm.registerSubtaskForSharedStates(subtaskKey2);
+ // firstly, we try shared state.
+ PhysicalFile file1 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.SHARED);
+ assertThat(file1.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ // allocate another
+ PhysicalFile file2 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.SHARED);
+ assertThat(file2.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file2).isNotEqualTo(file1);
+
+ // return for reuse
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
+
+ // allocate for another subtask
+ PhysicalFile file3 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey2, 0, CheckpointedStateScope.SHARED);
+ assertThat(file3.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.SHARED));
+ assertThat(file3).isNotEqualTo(file1);
+
+ // allocate for another checkpoint
+ PhysicalFile file4 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 1, CheckpointedStateScope.SHARED);
+ assertThat(file4.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file4).isEqualTo(file1);
+
+ // a physical file whose size is bigger than maxPhysicalFileSize
cannot be reused
+ file4.incSize(fmsm.maxPhysicalFileSize);
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 1, file4);
+ PhysicalFile file5 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 1, CheckpointedStateScope.SHARED);
+ assertThat(file5.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file5).isNotEqualTo(file4);
+
+ // Secondly, we try private state
+ PhysicalFile file6 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file6.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+
+ // allocate another
+ PhysicalFile file7 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file7.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+ assertThat(file7).isNotEqualTo(file5);
+
+ // return for reuse
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6);
+
+ // allocate for another checkpoint
+ PhysicalFile file8 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 2, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file8.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+ assertThat(file8).isEqualTo(file6);
+
+ // return for reuse
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file8);
+
+ // allocate for this checkpoint but another subtask
+ PhysicalFile file9 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey2, 2, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file9.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.EXCLUSIVE));
+ assertThat(file9).isEqualTo(file6);
+
+ // a physical file whose size is bigger than maxPhysicalFileSize
cannot be reused
+ file9.incSize(fmsm.maxPhysicalFileSize);
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 2, file9);
+ PhysicalFile file10 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 2, CheckpointedStateScope.SHARED);
+ assertThat(file10.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file10).isNotEqualTo(file9);
+
+ assertThat(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.EXCLUSIVE))
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+ }
+ }
+
+ @Test
+ public void testCheckpointNotification() throws Exception {
+ try (FileMergingSnapshotManager fmsm =
createFileMergingSnapshotManager(checkpointBaseDir);
+ CloseableRegistry closeableRegistry = new CloseableRegistry())
{
+ FileMergingCheckpointStateOutputStream cp1Stream =
+ writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp1StateHandle =
cp1Stream.closeAndGetHandle();
+ fmsm.notifyCheckpointComplete(subtaskKey1, 1);
+ assertFileInManagedDir(fmsm, cp1StateHandle);
+
+ // complete checkpoint-2
+ FileMergingCheckpointStateOutputStream cp2Stream =
+ writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp2StateHandle =
cp2Stream.closeAndGetHandle();
+ fmsm.notifyCheckpointComplete(subtaskKey1, 2);
+ assertFileInManagedDir(fmsm, cp2StateHandle);
+
+ // subsume checkpoint-1
+ assertThat(fileExists(cp1StateHandle)).isTrue();
+ fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
+ assertThat(fileExists(cp1StateHandle)).isTrue();
+
+ // abort checkpoint-3
+ FileMergingCheckpointStateOutputStream cp3Stream =
+ writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp3StateHandle =
cp3Stream.closeAndGetHandle();
+ assertFileInManagedDir(fmsm, cp3StateHandle);
+ fmsm.notifyCheckpointAborted(subtaskKey1, 3);
+ assertThat(fileExists(cp3StateHandle)).isTrue();
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
similarity index 67%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
index be47d272900..d6553060e2e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
@@ -43,18 +43,20 @@ import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FileMergingSnapshotManager}. */
-public class FileMergingSnapshotManagerTest {
+public abstract class FileMergingSnapshotManagerTestBase {
- private final String tmId = "Testing";
+ final String tmId = "Testing";
- private final OperatorID operatorID = new OperatorID(289347923L,
75893479L);
+ final OperatorID operatorID = new OperatorID(289347923L, 75893479L);
- private SubtaskKey subtaskKey1;
- private SubtaskKey subtaskKey2;
+ SubtaskKey subtaskKey1;
+ SubtaskKey subtaskKey2;
- private Path checkpointBaseDir;
+ Path checkpointBaseDir;
- private int writeBufferSize;
+ int writeBufferSize;
+
+ abstract FileMergingType getFileMergingType();
@BeforeEach
public void setup(@TempDir java.nio.file.Path tempFolder) {
@@ -90,113 +92,6 @@ public class FileMergingSnapshotManagerTest {
}
}
- @Test
- void testCreateAndReuseFiles() throws IOException {
- try (FileMergingSnapshotManagerBase fmsm =
- (FileMergingSnapshotManagerBase)
- createFileMergingSnapshotManager(checkpointBaseDir)) {
- fmsm.registerSubtaskForSharedStates(subtaskKey1);
- fmsm.registerSubtaskForSharedStates(subtaskKey2);
- // firstly, we try shared state.
- PhysicalFile file1 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 0, CheckpointedStateScope.SHARED);
- assertThat(file1.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
- // allocate another
- PhysicalFile file2 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 0, CheckpointedStateScope.SHARED);
- assertThat(file2.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
- assertThat(file2).isNotEqualTo(file1);
-
- // return for reuse
- fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
-
- // allocate for another subtask
- PhysicalFile file3 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey2, 0, CheckpointedStateScope.SHARED);
- assertThat(file3.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.SHARED));
- assertThat(file3).isNotEqualTo(file1);
-
- // allocate for another checkpoint
- PhysicalFile file4 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 1, CheckpointedStateScope.SHARED);
- assertThat(file4.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
- assertThat(file4).isNotEqualTo(file1);
-
- // allocate for this checkpoint
- PhysicalFile file5 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 0, CheckpointedStateScope.SHARED);
- assertThat(file5.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
- assertThat(file5).isEqualTo(file1);
-
- // a physical file whose size is bigger than maxPhysicalFileSize
cannot be reused
- file5.incSize(fmsm.maxPhysicalFileSize);
- fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5);
- PhysicalFile file6 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 0, CheckpointedStateScope.SHARED);
- assertThat(file6.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
- assertThat(file6).isNotEqualTo(file5);
-
- // Secondly, we try private state
- PhysicalFile file7 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
- assertThat(file7.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
-
- // allocate another
- PhysicalFile file8 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
- assertThat(file8.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
- assertThat(file8).isNotEqualTo(file6);
-
- // return for reuse
- fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7);
-
- // allocate for another checkpoint
- PhysicalFile file9 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
- assertThat(file9.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
- assertThat(file9).isNotEqualTo(file7);
-
- // allocate for this checkpoint but another subtask
- PhysicalFile file10 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE);
- assertThat(file10.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.EXCLUSIVE));
- assertThat(file10).isEqualTo(file7);
-
- // a physical file whose size is bigger than maxPhysicalFileSize
cannot be reused
- file10.incSize(fmsm.maxPhysicalFileSize);
- fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10);
- PhysicalFile file11 =
- fmsm.getOrCreatePhysicalFileForCheckpoint(
- subtaskKey1, 0, CheckpointedStateScope.SHARED);
- assertThat(file11.getFilePath().getParent())
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
- assertThat(file11).isNotEqualTo(file10);
-
- assertThat(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.EXCLUSIVE))
- .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
- }
- }
-
@Test
void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException {
try (FileMergingSnapshotManagerBase fmsm =
@@ -378,38 +273,6 @@ public class FileMergingSnapshotManagerTest {
}
}
- @Test
- public void testCheckpointNotification() throws Exception {
- try (FileMergingSnapshotManager fmsm =
createFileMergingSnapshotManager(checkpointBaseDir);
- CloseableRegistry closeableRegistry = new CloseableRegistry())
{
- FileMergingCheckpointStateOutputStream cp1Stream =
- writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
- SegmentFileStateHandle cp1StateHandle =
cp1Stream.closeAndGetHandle();
- fmsm.notifyCheckpointComplete(subtaskKey1, 1);
- assertFileInManagedDir(fmsm, cp1StateHandle);
-
- // complete checkpoint-2
- FileMergingCheckpointStateOutputStream cp2Stream =
- writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
- SegmentFileStateHandle cp2StateHandle =
cp2Stream.closeAndGetHandle();
- fmsm.notifyCheckpointComplete(subtaskKey1, 2);
- assertFileInManagedDir(fmsm, cp2StateHandle);
-
- // subsume checkpoint-1
- assertThat(fileExists(cp1StateHandle)).isTrue();
- fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
- assertThat(fileExists(cp1StateHandle)).isFalse();
-
- // abort checkpoint-3
- FileMergingCheckpointStateOutputStream cp3Stream =
- writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
- SegmentFileStateHandle cp3StateHandle =
cp3Stream.closeAndGetHandle();
- assertFileInManagedDir(fmsm, cp3StateHandle);
- fmsm.notifyCheckpointAborted(subtaskKey1, 3);
- assertThat(fileExists(cp3StateHandle)).isFalse();
- }
- }
-
@Test
public void testConcurrentFileReusingWithBlockingPool() throws Exception {
try (FileMergingSnapshotManagerBase fmsm =
@@ -455,13 +318,13 @@ public class FileMergingSnapshotManagerTest {
}
}
- private FileMergingSnapshotManager createFileMergingSnapshotManager(Path
checkpointBaseDir)
+ FileMergingSnapshotManager createFileMergingSnapshotManager(Path
checkpointBaseDir)
throws IOException {
return createFileMergingSnapshotManager(
checkpointBaseDir, 32 * 1024 * 1024,
PhysicalFilePool.Type.NON_BLOCKING);
}
- private FileMergingSnapshotManager createFileMergingSnapshotManager(
+ FileMergingSnapshotManager createFileMergingSnapshotManager(
Path checkpointBaseDir, long maxFileSize, PhysicalFilePool.Type
filePoolType)
throws IOException {
FileSystem fs = LocalFileSystem.getSharedInstance();
@@ -479,7 +342,7 @@ public class FileMergingSnapshotManagerTest {
fs.mkdirs(taskOwnedStateDir);
}
FileMergingSnapshotManager fmsm =
- new FileMergingSnapshotManagerBuilder(tmId)
+ new FileMergingSnapshotManagerBuilder(tmId,
getFileMergingType())
.setMaxFileSize(maxFileSize)
.setFilePoolType(filePoolType)
.build();
@@ -493,13 +356,13 @@ public class FileMergingSnapshotManagerTest {
return fmsm;
}
- private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+ FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
long checkpointId, FileMergingSnapshotManager fmsm,
CloseableRegistry closeableRegistry)
throws IOException {
return writeCheckpointAndGetStream(checkpointId, fmsm,
closeableRegistry, 32);
}
- private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+ FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
long checkpointId,
FileMergingSnapshotManager fmsm,
CloseableRegistry closeableRegistry,
@@ -515,7 +378,7 @@ public class FileMergingSnapshotManagerTest {
return stream;
}
- private void assertFileInManagedDir(
+ void assertFileInManagedDir(
FileMergingSnapshotManager fmsm, SegmentFileStateHandle
stateHandle) {
assertThat(fmsm instanceof FileMergingSnapshotManagerBase).isTrue();
assertThat(stateHandle).isNotNull();
@@ -524,7 +387,7 @@ public class FileMergingSnapshotManagerTest {
assertThat(((FileMergingSnapshotManagerBase)
fmsm).isResponsibleForFile(filePath)).isTrue();
}
- private boolean fileExists(SegmentFileStateHandle stateHandle) throws
IOException {
+ boolean fileExists(SegmentFileStateHandle stateHandle) throws IOException {
assertThat(stateHandle).isNotNull();
Path filePath = stateHandle.getFilePath();
assertThat(filePath).isNotNull();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
new file mode 100644
index 00000000000..4128e5e7cd3
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link WithinCheckpointFileMergingSnapshotManager}. */
+public class WithinCheckpointFileMergingSnapshotManagerTest
+ extends FileMergingSnapshotManagerTestBase {
+ @Override
+ FileMergingType getFileMergingType() {
+ return FileMergingType.MERGE_WITHIN_CHECKPOINT;
+ }
+
+ @Test
+ void testCreateAndReuseFiles() throws IOException {
+ try (FileMergingSnapshotManagerBase fmsm =
+ (FileMergingSnapshotManagerBase)
+ createFileMergingSnapshotManager(checkpointBaseDir)) {
+ fmsm.registerSubtaskForSharedStates(subtaskKey1);
+ fmsm.registerSubtaskForSharedStates(subtaskKey2);
+ // firstly, we try shared state.
+ PhysicalFile file1 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.SHARED);
+ assertThat(file1.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ // allocate another
+ PhysicalFile file2 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.SHARED);
+ assertThat(file2.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file2).isNotEqualTo(file1);
+
+ // return for reuse
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
+
+ // allocate for another subtask
+ PhysicalFile file3 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey2, 0, CheckpointedStateScope.SHARED);
+ assertThat(file3.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.SHARED));
+ assertThat(file3).isNotEqualTo(file1);
+
+ // allocate for another checkpoint
+ PhysicalFile file4 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 1, CheckpointedStateScope.SHARED);
+ assertThat(file4.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file4).isNotEqualTo(file1);
+
+ // allocate for this checkpoint
+ PhysicalFile file5 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.SHARED);
+ assertThat(file5.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file5).isEqualTo(file1);
+
+ // a physical file whose size is bigger than maxPhysicalFileSize
cannot be reused
+ file5.incSize(fmsm.maxPhysicalFileSize);
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5);
+ PhysicalFile file6 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.SHARED);
+ assertThat(file6.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file6).isNotEqualTo(file5);
+
+ // Secondly, we try private state
+ PhysicalFile file7 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file7.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+
+ // allocate another
+ PhysicalFile file8 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file8.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+ assertThat(file8).isNotEqualTo(file6);
+
+ // return for reuse
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7);
+
+ // allocate for another checkpoint
+ PhysicalFile file9 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file9.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+ assertThat(file9).isNotEqualTo(file7);
+
+ // allocate for this checkpoint but another subtask
+ PhysicalFile file10 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE);
+ assertThat(file10.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.EXCLUSIVE));
+ assertThat(file10).isEqualTo(file7);
+
+ // a physical file whose size is bigger than maxPhysicalFileSize
cannot be reused
+ file10.incSize(fmsm.maxPhysicalFileSize);
+ fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10);
+ PhysicalFile file11 =
+ fmsm.getOrCreatePhysicalFileForCheckpoint(
+ subtaskKey1, 0, CheckpointedStateScope.SHARED);
+ assertThat(file11.getFilePath().getParent())
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.SHARED));
+ assertThat(file11).isNotEqualTo(file10);
+
+ assertThat(fmsm.getManagedDir(subtaskKey2,
CheckpointedStateScope.EXCLUSIVE))
+ .isEqualTo(fmsm.getManagedDir(subtaskKey1,
CheckpointedStateScope.EXCLUSIVE));
+ }
+ }
+
+ @Test
+ public void testCheckpointNotification() throws Exception {
+ try (FileMergingSnapshotManager fmsm =
createFileMergingSnapshotManager(checkpointBaseDir);
+ CloseableRegistry closeableRegistry = new CloseableRegistry())
{
+ FileMergingCheckpointStateOutputStream cp1Stream =
+ writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp1StateHandle =
cp1Stream.closeAndGetHandle();
+ fmsm.notifyCheckpointComplete(subtaskKey1, 1);
+ assertFileInManagedDir(fmsm, cp1StateHandle);
+
+ // complete checkpoint-2
+ FileMergingCheckpointStateOutputStream cp2Stream =
+ writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp2StateHandle =
cp2Stream.closeAndGetHandle();
+ fmsm.notifyCheckpointComplete(subtaskKey1, 2);
+ assertFileInManagedDir(fmsm, cp2StateHandle);
+
+ // subsume checkpoint-1
+ assertThat(fileExists(cp1StateHandle)).isTrue();
+ fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
+ assertThat(fileExists(cp1StateHandle)).isFalse();
+
+ // abort checkpoint-3
+ FileMergingCheckpointStateOutputStream cp3Stream =
+ writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp3StateHandle =
cp3Stream.closeAndGetHandle();
+ assertFileInManagedDir(fmsm, cp3StateHandle);
+ fmsm.notifyCheckpointAborted(subtaskKey1, 3);
+ assertThat(fileExists(cp3StateHandle)).isFalse();
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
index 37fbaa98f09..d7796cf7f4f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -220,7 +221,9 @@ public class FsMergingCheckpointStorageLocationTest {
private FileMergingSnapshotManager createFileMergingSnapshotManager(long
maxFileSize) {
FileMergingSnapshotManager mgr =
- new FileMergingSnapshotManagerBuilder(SNAPSHOT_MGR_ID).build();
+ new FileMergingSnapshotManagerBuilder(
+ SNAPSHOT_MGR_ID,
FileMergingType.MERGE_WITHIN_CHECKPOINT)
+ .build();
mgr.initFileSystem(
getSharedInstance(),