This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9446e8591a IGNITE-23212 Fix the situation when page replacement occurs 
after the completion of the checkpoint (#4437)
9446e8591a is described below

commit 9446e8591a24dfa44d866483ff2a7383cac99118
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Sep 25 16:35:49 2024 +0300

    IGNITE-23212 Fix the situation when page replacement occurs after the 
completion of the checkpoint (#4437)
---
 modules/page-memory/build.gradle                   |   3 +
 .../tree/persistence/ItPageReplacementTest.java    | 414 +++++++++++++++++++++
 .../pagememory/persistence/PageStoreWriter.java    |  12 +-
 .../persistence/PersistentPageMemory.java          | 102 +++--
 .../persistence/checkpoint/CheckpointManager.java  |   1 -
 .../checkpoint/CheckpointPageReplacement.java      | 138 +++++++
 .../persistence/checkpoint/CheckpointPages.java    | 161 ++++++--
 .../checkpoint/CheckpointProgressImpl.java         |  58 +++
 .../persistence/checkpoint/CheckpointWorkflow.java |  10 +-
 .../persistence/checkpoint/Checkpointer.java       |  19 +-
 .../replacement/DelayedDirtyPageWrite.java         |  91 +++--
 .../replacement/DelayedPageReplacementTracker.java |   4 +-
 .../checkpoint/CheckpointPageReplacementTest.java  | 107 ++++++
 .../checkpoint/CheckpointPagesTest.java            | 110 +++---
 .../checkpoint/CheckpointWorkflowTest.java         |   2 +-
 .../checkpoint/TestCheckpointUtils.java            |  12 +
 .../internal/pagememory/TestPageIoModule.java      |  44 ++-
 .../pagememory/persistence/FakePartitionMeta.java  |  44 ++-
 18 files changed, 1160 insertions(+), 172 deletions(-)

diff --git a/modules/page-memory/build.gradle b/modules/page-memory/build.gradle
index 66dc53d07a..329644ce4e 100644
--- a/modules/page-memory/build.gradle
+++ b/modules/page-memory/build.gradle
@@ -54,8 +54,11 @@ dependencies {
     testFixturesImplementation(testFixtures(project(':ignite-core')))
     testFixturesImplementation libs.mockito.core
     testFixturesImplementation libs.auto.service.annotations
+    testFixturesImplementation libs.jetbrains.annotations
 
     integrationTestImplementation project(':ignite-storage-api')
+    integrationTestImplementation project(':ignite-failure-handler')
+    integrationTestImplementation project(':ignite-file-io')
     integrationTestImplementation(testFixtures(project))
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
diff --git 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java
 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java
new file mode 100644
index 0000000000..4d38230447
--- /dev/null
+++ 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.tree.persistence;
+
+import static 
org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
+import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.components.LogSyncer;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.pagememory.DataRegion;
+import 
org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo;
+import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
+import 
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
+import org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import 
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import 
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Integration tests for testing page replacement. */
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+public class ItPageReplacementTest extends BaseIgniteAbstractTest {
+    private static final String NODE_NAME = "test";
+
+    private static final int GROUP_ID = 1;
+
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_COUNT = 1;
+
+    private static final int PAGE_SIZE = 512;
+
+    private static final int PAGE_COUNT = 1024;
+
+    private static final int MAX_MEMORY_SIZE = PAGE_COUNT * PAGE_SIZE;
+
+    private static final int CPUS = Math.min(4, 
Runtime.getRuntime().availableProcessors());
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration("mock.checkpointThreads = 1")
+    private PageMemoryCheckpointConfiguration checkpointConfig;
+
+    @InjectConfiguration(
+            polymorphicExtensions = 
PersistentPageMemoryProfileConfigurationSchema.class,
+            value = "mock = {"
+                    + "engine=aipersist, "
+                    + "size=" + MAX_MEMORY_SIZE
+                    + "}"
+    )
+    private StorageProfileConfiguration storageProfileCfg;
+
+    private FilePageStoreManager filePageStoreManager;
+
+    private PartitionMetaManager partitionMetaManager;
+
+    private CheckpointManager checkpointManager;
+
+    private PersistentPageMemory pageMemory;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        FailureManager failureManager = mock(FailureManager.class);
+
+        var ioRegistry = new TestPageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        filePageStoreManager = new FilePageStoreManager(
+                NODE_NAME,
+                workDir,
+                new RandomAccessFileIoFactory(),
+                PAGE_SIZE,
+                failureManager
+        );
+
+        partitionMetaManager = new PartitionMetaManager(ioRegistry, PAGE_SIZE, 
FakePartitionMeta.FACTORY);
+
+        var dataRegionList = new ArrayList<DataRegion<PersistentPageMemory>>();
+
+        checkpointManager = new CheckpointManager(
+                NODE_NAME,
+                null,
+                null,
+                failureManager,
+                checkpointConfig,
+                filePageStoreManager,
+                partitionMetaManager,
+                dataRegionList,
+                ioRegistry,
+                mock(LogSyncer.class),
+                PAGE_SIZE
+        );
+
+        pageMemory = new PersistentPageMemory(
+                (PersistentPageMemoryProfileConfiguration) 
fixConfiguration(storageProfileCfg),
+                ioRegistry,
+                new long[]{MAX_MEMORY_SIZE},
+                10 * MiB,
+                filePageStoreManager,
+                null,
+                (pageMemory0, fullPageId, buf) -> 
checkpointManager.writePageToDeltaFilePageStore(pageMemory0, fullPageId, buf, 
true),
+                checkpointManager.checkpointTimeoutLock(),
+                PAGE_SIZE
+        );
+
+        dataRegionList.add(() -> pageMemory);
+
+        filePageStoreManager.start();
+        checkpointManager.start();
+        pageMemory.start();
+
+        createPartitionFilePageStoresIfMissing();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(
+                checkpointManager == null ? null : checkpointManager::stop,
+                pageMemory == null ? null : () -> pageMemory.stop(true),
+                filePageStoreManager == null ? null : 
filePageStoreManager::stop
+        );
+    }
+
+    /** Checks that page replacement will occur after the start of the 
checkpoint and before its end. */
+    @Test
+    void testPageReplacement() throws Throwable {
+        var startWritePagesOnCheckpointFuture = new CompletableFuture<Void>();
+        var continueWritePagesOnCheckpointFuture = new 
CompletableFuture<Void>();
+
+        var startWritePagesOnPageReplacementFuture = new 
CompletableFuture<Void>();
+
+        CheckpointProgress checkpointProgress = inCheckpointReadLock(() -> {
+            // We make only one dirty page for the checkpoint to work.
+            createAndFillTestSimpleValuePages(1);
+
+            FilePageStore filePageStore = filePageStoreManager.getStore(new 
GroupPartitionId(GROUP_ID, PARTITION_ID));
+
+            // First time the method should be invoked by the checkpoint 
writer, let's hold it for page replacement.
+            doAnswer(invocation -> {
+                startWritePagesOnCheckpointFuture.complete(null);
+
+                assertThat(continueWritePagesOnCheckpointFuture, 
willCompleteSuccessfully());
+
+                return invocation.callRealMethod();
+            }).doAnswer(invocation -> {
+                // Second time the method should be invoked on page 
replacement.
+                startWritePagesOnPageReplacementFuture.complete(null);
+
+                return invocation.callRealMethod();
+            }).when(filePageStore).getOrCreateNewDeltaFile(any(), any());
+
+            // Trigger checkpoint so that it writes a meta page and one dirty 
one. We do it under a read lock to ensure that the background
+            // does not start after the lock is released.
+            return checkpointManager.forceCheckpoint("for test");
+        });
+
+        CompletableFuture<Void> finishCheckpointFuture = 
checkpointProgress.futureFor(FINISHED);
+
+        // Let's wait for the checkpoint writer to start writing the first 
page.
+        assertThat(startWritePagesOnCheckpointFuture, 
willCompleteSuccessfully());
+        // Let's make sure that no one has tried to write another page.
+        assertFalse(startWritePagesOnPageReplacementFuture.isDone());
+
+        // We will create dirty pages until the page replacement occurs.
+        // Asynchronously so as not to get into dead locks or something like 
that.
+        assertThat(
+                runAsync(() -> inCheckpointReadLock(
+                        () -> createAndFillTestSimpleValuePages(() -> 
!startWritePagesOnPageReplacementFuture.isDone())
+                )),
+                willCompleteSuccessfully()
+        );
+        assertFalse(finishCheckpointFuture.isDone());
+
+        continueWritePagesOnCheckpointFuture.complete(null);
+        assertThat(finishCheckpointFuture, willCompleteSuccessfully());
+        assertTrue(pageMemory.pageReplacementOccurred());
+    }
+
+    @Test
+    void 
testFsyncDeltaFilesWillNotStartOnCheckpointUntilPageReplacementIsComplete() 
throws Exception {
+        var startWritePagesOnCheckpointFuture = new CompletableFuture<Void>();
+        var continueWritePagesOnCheckpointFuture = new 
CompletableFuture<Void>();
+
+        var startWritePagesOnPageReplacementFuture = new 
CompletableFuture<Void>();
+        var continueWritePagesOnPageReplacementFuture = new 
CompletableFuture<Void>();
+
+        var deltaFileIoFuture = new CompletableFuture<DeltaFilePageStoreIo>();
+
+        CheckpointProgress checkpointProgress = inCheckpointReadLock(() -> {
+            // We make only one dirty page for the checkpoint to work.
+            createAndFillTestSimpleValuePages(1);
+
+            FilePageStore filePageStore = filePageStoreManager.getStore(new 
GroupPartitionId(GROUP_ID, PARTITION_ID));
+
+            // First time the method should be invoked by the checkpoint 
writer, let's hold it for page replacement.
+            doAnswer(invocation -> {
+                CompletableFuture<DeltaFilePageStoreIo> callRealMethodResult =
+                        (CompletableFuture<DeltaFilePageStoreIo>) 
invocation.callRealMethod();
+
+                callRealMethodResult = callRealMethodResult
+                        .handle((deltaFilePageStoreIo, throwable) -> {
+                            if (throwable != null) {
+                                
deltaFileIoFuture.completeExceptionally(throwable);
+
+                                throw new CompletionException(throwable);
+                            } else {
+                                deltaFilePageStoreIo = 
spy(deltaFilePageStoreIo);
+
+                                
deltaFileIoFuture.complete(deltaFilePageStoreIo);
+
+                                return deltaFilePageStoreIo;
+                            }
+                        });
+
+                startWritePagesOnCheckpointFuture.complete(null);
+
+                assertThat(continueWritePagesOnCheckpointFuture, 
willCompleteSuccessfully());
+
+                return callRealMethodResult;
+            }).doAnswer(invocation -> {
+                // Second time the method should be invoked on page 
replacement, let's hold it.
+                startWritePagesOnPageReplacementFuture.complete(null);
+
+                assertThat(continueWritePagesOnPageReplacementFuture, 
willCompleteSuccessfully());
+
+                return invocation.callRealMethod();
+            }).when(filePageStore).getOrCreateNewDeltaFile(any(), any());
+
+            doReturn(deltaFileIoFuture).when(filePageStore).getNewDeltaFile();
+
+            // Trigger checkpoint so that it writes a meta page and one dirty 
one. We do it under a read lock to ensure that the background
+            // does not start after the lock is released.
+            return checkpointManager.forceCheckpoint("for test");
+        });
+
+        CompletableFuture<Void> finishCheckpointFuture = 
checkpointProgress.futureFor(FINISHED);
+
+        // Let's wait for the checkpoint writer to start writing the first 
page.
+        assertThat(startWritePagesOnCheckpointFuture, 
willCompleteSuccessfully());
+        assertThat(deltaFileIoFuture, willCompleteSuccessfully());
+
+        // We will create dirty pages until the page replacement occurs.
+        // Asynchronously so as not to get into dead locks or something like 
that.
+
+        CompletableFuture<Void> createPagesForPageReplacementFuture = runAsync(
+                () -> inCheckpointReadLock(() -> 
createAndFillTestSimpleValuePages(() -> 
!startWritePagesOnPageReplacementFuture.isDone()))
+        );
+        assertThat(startWritePagesOnPageReplacementFuture, 
willCompleteSuccessfully());
+        assertFalse(createPagesForPageReplacementFuture.isDone());
+
+        // Let's release the checkpoint and make sure that it does not 
complete and fsync does not occur until the page replacement is
+        // complete.
+        continueWritePagesOnCheckpointFuture.complete(null);
+        assertThat(finishCheckpointFuture, willTimeoutFast());
+        // 250 by analogy with willTimeoutFast().
+        verify(deltaFileIoFuture.join(), timeout(250).times(0)).sync();
+        assertFalse(createPagesForPageReplacementFuture.isDone());
+
+        // Let's release the page replacement and make sure everything ends 
well.
+        continueWritePagesOnPageReplacementFuture.complete(null);
+
+        assertThat(finishCheckpointFuture, willCompleteSuccessfully());
+        assertThat(createPagesForPageReplacementFuture, 
willCompleteSuccessfully());
+        verify(deltaFileIoFuture.join()).sync();
+    }
+
+    private void createAndFillTestSimpleValuePage(long pageId) throws 
Exception {
+        long page = pageMemory.acquirePage(GROUP_ID, pageId);
+
+        try {
+            long pageAddr = pageMemory.writeLock(GROUP_ID, pageId, page);
+
+            try {
+                new TestSimpleValuePageIo().initNewPage(pageAddr, pageId, 
PAGE_SIZE);
+
+                TestSimpleValuePageIo.setLongValue(pageAddr, pageIndex(pageId) 
* 3L);
+            } finally {
+                pageMemory.writeUnlock(GROUP_ID, pageId, page, true);
+            }
+        } finally {
+            pageMemory.releasePage(GROUP_ID, pageId, page);
+        }
+    }
+
+    private void createPartitionFilePageStoresIfMissing() throws Exception {
+        ByteBuffer buffer = allocateBuffer(PAGE_SIZE);
+
+        try {
+            for (int partitionId = 0; partitionId < PARTITION_COUNT; 
partitionId++) {
+                var groupPartitionId = new GroupPartitionId(GROUP_ID, 
partitionId);
+
+                FilePageStore filePageStore = 
filePageStoreManager.readOrCreateStore(groupPartitionId, buffer.rewind());
+
+                filePageStore.ensure();
+
+                PartitionMeta partitionMeta = 
partitionMetaManager.readOrCreateMeta(
+                        null,
+                        groupPartitionId,
+                        filePageStore,
+                        buffer.rewind()
+                );
+
+                filePageStore.pages(partitionMeta.pageCount());
+
+                filePageStore.setPageAllocationListener(pageIdx -> {
+                    assert 
checkpointManager.checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+                    CheckpointProgress checkpointProgress = 
checkpointManager.lastCheckpointProgress();
+
+                    partitionMeta.incrementPageCount(checkpointProgress == 
null ? null : checkpointProgress.id());
+                });
+
+                filePageStoreManager.addStore(groupPartitionId, 
spy(filePageStore));
+                partitionMetaManager.addMeta(groupPartitionId, partitionMeta);
+            }
+        } finally {
+            freeBuffer(buffer);
+        }
+    }
+
+    private <V> V inCheckpointReadLock(Callable<V> callable) throws Exception {
+        checkpointManager.checkpointTimeoutLock().checkpointReadLock();
+
+        try {
+            return callable.call();
+        } finally {
+            checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
+        }
+    }
+
+    private void inCheckpointReadLock(RunnableX runnableX) throws Throwable {
+        checkpointManager.checkpointTimeoutLock().checkpointReadLock();
+
+        try {
+            runnableX.run();
+        } finally {
+            checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
+        }
+    }
+
+    private void createAndFillTestSimpleValuePages(int pageCount) throws 
Exception {
+        for (int i = 0; i < pageCount; i++) {
+            createAndFillTestSimpleValuePage(pageMemory.allocatePage(null, 
GROUP_ID, PARTITION_ID, FLAG_DATA));
+        }
+    }
+
+    private void createAndFillTestSimpleValuePages(BooleanSupplier 
continuePredicate) throws Exception {
+        while (continuePredicate.getAsBoolean()) {
+            createAndFillTestSimpleValuePage(pageMemory.allocatePage(null, 
GROUP_ID, PARTITION_ID, FLAG_DATA));
+        }
+    }
+}
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
index 54783ce690..1c43ad6675 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
@@ -18,22 +18,20 @@
 package org.apache.ignite.internal.pagememory.persistence;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 
-/**
- * Interface for write page to {@link PageStore}.
- */
+/** Interface for writing dirty pages to {@link PageStore} on checkpoint. */
 public interface PageStoreWriter {
     /**
-     * Callback for write page. {@link PersistentPageMemory} will copy page 
content to buffer before call.
+     * Writes page to {@link PageStore}. {@link PersistentPageMemory} will 
copy page content to buffer before call.
      *
      * @param fullPageId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by the {@link
-     * PersistentPageMemory#beginCheckpoint(CompletableFuture)} method call.
+     * PersistentPageMemory#beginCheckpoint} method call.
      * @param buf Temporary buffer to write changes into.
-     * @param tag {@code Partition generation} if data was read.
+     * @param tag {@code Partition generation} if data was read. {@link 
PersistentPageMemory#TRY_AGAIN_TAG} if failed to get a write lock
+     *      for a page and need to try writing again later.
      * @throws IgniteInternalCheckedException If write page failed.
      */
     void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws 
IgniteInternalCheckedException;
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 2454435dcc..8f4e2ec164 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -68,7 +68,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,8 +92,10 @@ import 
org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointMetricsTracker;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
 import 
org.apache.ignite.internal.pagememory.persistence.replacement.ClockPageReplacementPolicyFactory;
+import 
org.apache.ignite.internal.pagememory.persistence.replacement.DelayedDirtyPageWrite;
 import 
org.apache.ignite.internal.pagememory.persistence.replacement.DelayedPageReplacementTracker;
 import 
org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicy;
 import 
org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicyFactory;
@@ -509,7 +510,6 @@ public class PersistentPageMemory implements PageMemory {
         return dirty(absPtr);
     }
 
-    /** {@inheritDoc} */
     @Override
     public long allocatePageNoReuse(int grpId, int partId, byte flags) throws 
IgniteInternalCheckedException {
         assert partId >= 0 && partId <= MAX_PARTITION_ID : partId;
@@ -526,7 +526,6 @@ public class PersistentPageMemory implements PageMemory {
 
         seg.writeLock().lock();
 
-
         try {
             FullPageId fullId = new FullPageId(pageId, grpId);
 
@@ -587,10 +586,9 @@ public class PersistentPageMemory implements PageMemory {
             throw e;
         } finally {
             seg.writeLock().unlock();
-        }
 
-        // Finish replacement only when an exception wasn't thrown otherwise 
it possible to corrupt B+Tree.
-        delayedPageReplacementTracker.delayedPageWrite().finishReplacement();
+            
delayedPageReplacementTracker.delayedPageWrite().flushCopiedPageIfExists();
+        }
 
         return pageId;
     }
@@ -800,7 +798,7 @@ public class PersistentPageMemory implements PageMemory {
         } finally {
             seg.writeLock().unlock();
 
-            
delayedPageReplacementTracker.delayedPageWrite().finishReplacement();
+            
delayedPageReplacementTracker.delayedPageWrite().flushCopiedPageIfExists();
 
             if (readPageFromStore) {
                 assert lockedPageAbsPtr != -1 : "Page is expected to have a 
valid address [pageId=" + fullId
@@ -1495,33 +1493,61 @@ public class PersistentPageMemory implements PageMemory 
{
         }
 
         /**
-         * Prepares a page removal for page replacement, if needed.
+         * Tries to replace the page.
          *
-         * @param fullPageId Candidate page full ID.
-         * @param absPtr Absolute pointer of the page to evict.
-         * @return {@code True} if it is ok to replace this page, {@code 
false} if another page should be selected.
-         * @throws IgniteInternalCheckedException If failed to write page to 
the underlying store during eviction.
+         * <p>The replacement will be successful if the following conditions 
are met:</p>
+         * <ul>
+         *     <li>Page is pinned by another thread, such as a checkpoint 
dirty page writer or in the process of being modified - nothing
+         *     needs to be done.</li>
+         *     <li>Page is not dirty - just remove it from the loaded 
pages.</li>
+         *     <li>Page is dirty, there is a checkpoint in the process and the 
following sub-conditions are met:</li>
+         *     <ul>
+         *         <li>Page belongs to current checkpoint.</li>
+         *         <li>If the dirty page sorting phase is complete, otherwise 
we wait for it. This is necessary so that we can safely
+         *         create partition delta files in which the dirty page order 
must be preserved.</li>
+         *         <li>If the checkpoint dirty page writer has not started 
writing the page or has already written it.</li>
+         *     </ul>
+         * </ul>
+         *
+         * <p>It is expected that if the method returns {@code true}, it will 
not be invoked again for the same page ID.</p>
+         *
+         * <p>If we intend to replace a page, it is important for us to block 
the delta file fsync phase of the checkpoint to preserve data
+         * consistency. The phase should not start until all dirty pages are 
written by the checkpoint writer, but for page replacement we
+         * must block it ourselves.</p>
+         *
+         * @param fullPageId Candidate page ID.
+         * @param absPtr Absolute pointer to the candidate page.
+         * @return {@code True} if the page replacement was successful, 
otherwise need to try another one.
+         * @throws IgniteInternalCheckedException If any error occurred while 
waiting for the dirty page sorting phase to complete at a
+         *      checkpoint.
          */
         public boolean tryToRemovePage(FullPageId fullPageId, long absPtr) 
throws IgniteInternalCheckedException {
             assert writeLock().isHeldByCurrentThread();
 
             if (isAcquired(absPtr)) {
+                // Page is pinned by another thread, such as a checkpoint 
dirty page writer or in the process of being modified - nothing
+                // needs to be done.
                 return false;
             }
 
             if (isDirty(absPtr)) {
                 CheckpointPages checkpointPages = this.checkpointPages;
-                // Can evict a dirty page only if should be written by a 
checkpoint.
-                // These pages does not have tmp buffer.
-                if (checkpointPages != null && 
checkpointPages.allowToSave(fullPageId)) {
-                    WriteDirtyPage writeDirtyPage = 
delayedPageReplacementTracker.delayedPageWrite();
-
-                    writeDirtyPage.write(PersistentPageMemory.this, 
fullPageId, wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()));
+                // Can replace a dirty page only if it should be written by a 
checkpoint.
+                // Safe to invoke because we keep segment write lock and the 
checkpoint writer must remove pages on the segment read lock.
+                if (checkpointPages != null && 
checkpointPages.removeOnPageReplacement(fullPageId)) {
+                    checkpointPages.blockFsyncOnPageReplacement(fullPageId);
+
+                    DelayedDirtyPageWrite delayedDirtyPageWrite = 
delayedPageReplacementTracker.delayedPageWrite();
+
+                    delayedDirtyPageWrite.copyPageToTemporaryBuffer(
+                            PersistentPageMemory.this,
+                            fullPageId,
+                            wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()),
+                            checkpointPages
+                    );
 
                     setDirty(fullPageId, absPtr, false, true);
 
-                    checkpointPages.markAsSaved(fullPageId);
-
                     loadedPages.remove(fullPageId.groupId(), 
fullPageId.effectivePageId());
 
                     return true;
@@ -1826,23 +1852,23 @@ public class PersistentPageMemory implements PageMemory 
{
     /**
      * Returns {@code true} if remove successfully.
      *
-     * @param fullPageId Page ID to clear.
+     * @param fullPageId Page ID to remove.
      */
-    boolean clearCheckpoint(FullPageId fullPageId) {
+    private boolean removeOnCheckpoint(FullPageId fullPageId) {
         Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
 
         CheckpointPages pages0 = seg.checkpointPages;
 
-        assert pages0 != null;
+        assert pages0 != null : fullPageId;
 
-        return pages0.markAsSaved(fullPageId);
+        return pages0.removeOnCheckpoint(fullPageId);
     }
 
     /**
      * Makes a full copy of the dirty page for checkpointing, then marks the 
page as not dirty.
      *
      * @param absPtr Absolute page pointer.
-     * @param fullId Full page id.
+     * @param fullId Full page ID.
      * @param buf Buffer for copy page content for future write via {@link 
PageStoreWriter}.
      * @param pageSingleAcquire Page is acquired only once. We don't pin the 
page second time (until page will not be copied) in case
      *      checkpoint temporary buffer is used.
@@ -1883,7 +1909,7 @@ public class PersistentPageMemory implements PageMemory {
             return;
         }
 
-        if (!clearCheckpoint(fullId)) {
+        if (!removeOnCheckpoint(fullId)) {
             rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
 
             if (!pageSingleAcquire) {
@@ -1946,8 +1972,8 @@ public class PersistentPageMemory implements PageMemory {
     /**
      * Prepare page for write during checkpoint. {@link PageStoreWriter} will 
be called when the page will be ready to write.
      *
-     * @param fullId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by the {@link
-     * #beginCheckpoint(CompletableFuture)} method call.
+     * @param fullId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by the {@link #beginCheckpoint}
+     *      method call.
      * @param buf Temporary buffer to write changes into.
      * @param pageStoreWriter Checkpoint page write context.
      * @param tracker Checkpoint metrics tracker.
@@ -2067,11 +2093,11 @@ public class PersistentPageMemory implements PageMemory 
{
      * begun, the modifications will be written to a temporary buffer which 
will be flushed to the main memory after the checkpointing
      * finished. This method must be called when no concurrent operations on 
pages are performed.
      *
-     * @param allowToReplace The sign which allows replacing pages from a 
checkpoint by page replacer.
+     * @param checkpointProgress Progress of the current checkpoint.
      * @return Collection view of dirty page IDs.
      * @throws IgniteInternalException If checkpoint has been already started 
and was not finished.
      */
-    public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> 
allowToReplace) throws IgniteInternalException {
+    public Collection<FullPageId> beginCheckpoint(CheckpointProgress 
checkpointProgress) throws IgniteInternalException {
         if (segments == null) {
             return List.of();
         }
@@ -2081,11 +2107,15 @@ public class PersistentPageMemory implements PageMemory 
{
         for (int i = 0; i < segments.length; i++) {
             Segment segment = segments[i];
 
-            assert segment.checkpointPages == null : "Failed to begin 
checkpoint (it is already in progress)";
+            assert segment.checkpointPages == null : String.format(
+                    "Failed to begin checkpoint (it is already in progress): 
[storageProfile=%s, segmentIdx=%s]",
+                    storageProfileView.name(), i
+            );
 
-            Set<FullPageId> segmentDirtyPages = (dirtyPageIds[i] = 
segment.dirtyPages);
+            Set<FullPageId> segmentDirtyPages = segment.dirtyPages;
+            dirtyPageIds[i] = segmentDirtyPages;
 
-            segment.checkpointPages = new CheckpointPages(segmentDirtyPages, 
allowToReplace);
+            segment.checkpointPages = new CheckpointPages(segmentDirtyPages, 
checkpointProgress);
 
             segment.resetDirtyPages();
         }
@@ -2123,4 +2153,10 @@ public class PersistentPageMemory implements PageMemory {
 
         return checkpointPool.size() > checkpointBufLimit;
     }
+
+    /** Returns {@code true} if a page replacement has occurred at least once. 
*/
+    @TestOnly
+    public boolean pageReplacementOccurred() {
+        return pageReplacementWarned > 0;
+    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 9bb883597c..4d85b72087 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -309,7 +309,6 @@ public class CheckpointManager {
         CheckpointProgress lastCheckpointProgress = lastCheckpointProgress();
 
         assert lastCheckpointProgress != null : "Checkpoint has not happened 
yet";
-        // TODO https://issues.apache.org/jira/browse/IGNITE-23212 This 
assertion fails sometimes.
         assert lastCheckpointProgress.inProgress() : "Checkpoint must be in 
progress";
 
         CheckpointDirtyPages pagesToWrite = 
lastCheckpointProgress.pagesToWrite();
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java
new file mode 100644
index 0000000000..782c3fe34f
--- /dev/null
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for synchronizing page replacement and the beginning of the 
fsync phase at a checkpoint.
+ *
+ * <p>For data consistency, it is important for us that the fsync delta file 
phase begins strictly after all page replacements are
+ * completed.</p>
+ *
+ * <p>Usage:</p>
+ * <ul>
+ *     <li>{@link #block} - before you need to perform a page replacement.</li>
+ *     <li>{@link #unblock} - after the page replacement has finished and 
written to disk. The method must be invoked even if any error
+ *     occurred, so as not to hang a checkpoint.</li>
+ *     <li>{@link #stopBlocking} - must be invoked before the start of the 
fsync phase on the checkpoint and wait for the future to
+ *     complete in order to safely perform the phase.</li>
+ * </ul>
+ *
+ * <p>Thread safe.</p>
+ */
+class CheckpointPageReplacement {
+    /** IDs of pages for which page replacement is in progress. */
+    private final Set<FullPageId> pageIds = ConcurrentHashMap.newKeySet();
+
+    private final CompletableFuture<Void> stopBlockingFuture = new 
CompletableFuture<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /**
+     * Block the start of the fsync phase at a checkpoint before replacing the 
page.
+     *
+     * <p>It is expected that the method will be invoked once and after that 
the {@link #unblock} will be invoked on the same page.</p>
+     *
+     * <p>It is expected that the method will not be invoked after {@link 
#stopBlocking}, since by the start of the fsync phase, write
+     * dirty pages at the checkpoint should be complete and no new page 
replacements should be started.</p>
+     *
+     * @param pageId Page ID for which page replacement will begin.
+     * @see #unblock(FullPageId, Throwable)
+     * @see #stopBlocking()
+     */
+    void block(FullPageId pageId) {
+        boolean enterBusy = busyLock.enterBusy();
+
+        assert enterBusy : "Method should not be invoked after the fsync phase 
has started for any page: " + pageId;
+
+        try {
+            boolean added = pageIds.add(pageId);
+
+            assert added : "Page is already in the process of being replaced: 
" + pageId;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Unblocks the start of the fsync phase at a checkpoint after the page 
replacement is completed.
+     *
+     * <p>It is expected that the method will be invoked once and after the 
{@link #block} for same page ID.</p>
+     *
+     * <p>The fsync phase will only be started after page replacement has been 
completed for all pages for which
+     * {@link #block} was invoked before {@link #stopBlocking} was invoked, or 
no page replacement occurred at all.</p>
+     *
+     * <p>If an error occurs on any page replacement during one checkpoint, 
the future from {@link #stopBlocking} will complete with the
+     * first error.</p>
+     *
+     * <p>The method must be invoked even if any error occurred, so as not to 
hang a checkpoint.</p>
+     *
+     * @param pageId Page ID for which the page replacement has ended.
+     * @param error Error on page replacement, {@code null} if missing.
+     * @see #block(FullPageId)
+     * @see #stopBlocking()
+     */
+    void unblock(FullPageId pageId, @Nullable Throwable error) {
+        boolean removed = pageIds.remove(pageId);
+
+        assert removed : "Replacement for the page either did not start or 
ended: " + pageId;
+
+        if (error != null) {
+            stopBlockingFuture.completeExceptionally(error);
+
+            return;
+        }
+
+        if (!busyLock.enterBusy()) {
+            if (pageIds.isEmpty()) {
+                stopBlockingFuture.complete(null);
+            }
+        } else {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Stops new blocks before the fsync phase starts at a checkpoint.
+     *
+     * @return Future that will be completed successfully if all {@link 
#block} are completed before the current method is invoked, either
+     *      if there were none, or with an error from the first {@link 
#unblock}.
+     * @see #block(FullPageId)
+     * @see #unblock(FullPageId, Throwable)
+     */
+    CompletableFuture<Void> stopBlocking() {
+        if (stopGuard.compareAndSet(false, true)) {
+            busyLock.block();
+        }
+
+        if (pageIds.isEmpty()) {
+            stopBlockingFuture.complete(null);
+        }
+
+        return stopBlockingFuture;
+    }
+}
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
index dd7c139ba2..f6897a03b8 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -17,75 +17,184 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 
+import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * View of pages which should be stored during current checkpoint.
+ * Class contains dirty pages of the segment that will need to be written at a 
checkpoint or page replacement. It also contains helper
+ * methods before writing pages.
+ *
+ * <p>For correct parallel operation of the checkpoint writer and page 
replacement, external synchronization must be used.</p>
+ *
+ * @see PersistentPageMemory#checkpointWritePage(FullPageId, ByteBuffer, 
PageStoreWriter, CheckpointMetricsTracker)
+ * @see PersistentPageMemory.Segment#tryToRemovePage(FullPageId, long)
  */
 public class CheckpointPages {
-    private final Set<FullPageId> segmentPages;
+    private final Set<FullPageId> pageIds;
 
-    private final CompletableFuture<?> allowToReplace;
+    private final CheckpointProgressImpl checkpointProgress;
 
     /**
      * Constructor.
      *
-     * @param pages Pages which would be stored to disk in current checkpoint, 
does not copy the set.
-     * @param replaceFuture The sign which allows replacing pages from a 
checkpoint by page replacer.
+     * @param pageIds Dirty page IDs in the segment that should be written at 
a checkpoint or page replacement.
+     * @param checkpointProgress Progress of the current checkpoint at which 
the object was created.
      */
-    public CheckpointPages(Set<FullPageId> pages, CompletableFuture<?> 
replaceFuture) {
-        segmentPages = pages;
-        allowToReplace = replaceFuture;
+    public CheckpointPages(Set<FullPageId> pageIds, CheckpointProgress 
checkpointProgress) {
+        this.pageIds = pageIds;
+        this.checkpointProgress = (CheckpointProgressImpl) checkpointProgress;
     }
 
     /**
-     * Returns {@code true} If fullPageId is allowable to store to disk.
+     * Removes a page ID that would be written at page replacement. Must be 
invoked before writing a page to disk.
+     *
+     * <p>Page will be removed only if the dirty page sorting phase at the 
checkpoint has completed or will synchronously wait for it to
+     * complete.</p>
      *
-     * @param fullPageId Page id for checking.
-     * @throws IgniteInternalCheckedException If the waiting sign which allows 
replacing pages from a checkpoint by page replacer fails.
+     * <p>To keep the data consistent, we need to use {@link 
#blockFsyncOnPageReplacement} and {@link #unblockFsyncOnPageReplacement} after
+     * calling the current method to prevent the fsync checkpoint phase from 
starting.</p>
+     *
+     * @param pageId Page ID to remove.
+     * @return {@code True} if the page was removed by the current method 
invoke, {@code false} if the page was already removed by another
+     *      removes or did not exist.
+     * @throws IgniteInternalCheckedException If any error occurred while 
waiting for the dirty page sorting phase to complete at a
+     *         checkpoint.
+     * @see #removeOnCheckpoint(FullPageId)
+     * @see #blockFsyncOnPageReplacement(FullPageId)
+     * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
      */
-    public boolean allowToSave(FullPageId fullPageId) throws 
IgniteInternalCheckedException {
+    public boolean removeOnPageReplacement(FullPageId pageId) throws 
IgniteInternalCheckedException {
         try {
             // Uninterruptibly is important because otherwise in case of 
interrupt of client thread node would be stopped.
-            getUninterruptibly(allowToReplace);
+            getUninterruptibly(checkpointProgress.futureFor(PAGES_SORTED));
         } catch (ExecutionException e) {
             throw new IgniteInternalCheckedException(e.getCause());
         } catch (CancellationException e) {
             throw new IgniteInternalCheckedException(e);
         }
 
-        return segmentPages.contains(fullPageId);
+        return pageIds.remove(pageId);
     }
 
     /**
-     * Returns {@code true} If fullPageId is candidate to stored to disk by 
current checkpoint.
+     * Removes a page ID that would be written at checkpoint. Must be invoked 
before writing a page to disk.
+     *
+     * <p>We don't need to block the fsync phase of the checkpoint, since it 
won't start until all dirty pages have been written at the
+     * checkpoint, except those for which page replacement has occurred.</p>
      *
-     * @param fullPageId Page id for checking.
+     * @param pageId Page ID to remove.
+     * @return {@code True} if the page was removed by the current method 
invoke, {@code false} if the page was already removed by another
+     *      removes or did not exist.
+     * @see #removeOnPageReplacement(FullPageId)
      */
-    public boolean contains(FullPageId fullPageId) {
-        return segmentPages.contains(fullPageId);
+    public boolean removeOnCheckpoint(FullPageId pageId) {
+        return pageIds.remove(pageId);
     }
 
     /**
-     * Returns {@code true} if it is marking was successful.
+     * Returns {@code true} if the page has not yet been written by a 
checkpoint or page replacement.
      *
-     * @param fullPageId Page id which should be marked as saved to disk.
+     * @param pageId Page ID for checking.
      */
-    public boolean markAsSaved(FullPageId fullPageId) {
-        return segmentPages.remove(fullPageId);
+    public boolean contains(FullPageId pageId) {
+        return pageIds.contains(pageId);
+    }
+
+    /** Returns the current size of all pages that will be written at a 
checkpoint or page replacement. */
+    public int size() {
+        return pageIds.size();
     }
 
     /**
-     * Returns size of all pages in current checkpoint.
+     * Block the start of the fsync phase at a checkpoint before replacing the 
page.
+     *
+     * <p>It is expected that the method will be invoked once and after that 
the {@link #unblockFsyncOnPageReplacement} will be invoked on
+     * the same page.</p>
+     *
+     * @param pageId Page ID for which page replacement will begin.
+     * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
+     * @see #removeOnPageReplacement(FullPageId)
      */
-    public int size() {
-        return segmentPages.size();
+    public void blockFsyncOnPageReplacement(FullPageId pageId) {
+        checkpointProgress.blockFsyncOnPageReplacement(pageId);
+    }
+
+    /**
+     * Unblocks the start of the fsync phase at a checkpoint after the page 
replacement is completed.
+     *
+     * <p>It is expected that the method will be invoked once and after the 
{@link #blockFsyncOnPageReplacement} for same page ID.</p>
+     *
+     * <p>The fsync phase will only be started after page replacement has been 
completed for all pages for which
+     * {@link #blockFsyncOnPageReplacement} was invoked or no page replacement 
occurred at all.</p>
+     *
+     * <p>The method must be invoked even if any error occurred, so as not to 
hang a checkpoint.</p>
+     *
+     * @param pageId Page ID for which the page replacement has ended.
+     * @param error Error on page replacement, {@code null} if missing.
+     * @see #blockFsyncOnPageReplacement(FullPageId)
+     * @see #removeOnPageReplacement(FullPageId)
+     */
+    public void unblockFsyncOnPageReplacement(FullPageId pageId, @Nullable 
Throwable error) {
+        checkpointProgress.unblockFsyncOnPageReplacement(pageId, error);
+    }
+
+    /**
+     * Blocks physical destruction of partition.
+     *
+     * <p>When the intention to destroy partition appears, {@link 
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
+     * {@link PersistentPageMemory#invalidate(int, int)} invoked at the 
beginning. And if there is a block, it waits for unblocking.
+     * Then it destroys the partition, {@link 
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
+     *
+     * <p>It is recommended to use where physical destruction of the partition 
may have an impact, for example when writing dirty pages and
+     * executing a fsync.</p>
+     *
+     * <p>To make sure that we can physically do something with the partition 
during a block, we will need to use approximately the
+     * following code:</p>
+     * <pre><code>
+     *     checkpointProgress.blockPartitionDestruction(partitionId);
+     *
+     *     try {
+     *         FilePageStore pageStore = 
FilePageStoreManager#getStore(partitionId);
+     *
+     *         if (pageStore == null || pageStore.isMarkedToDestroy()) {
+     *             return;
+     *         }
+     *
+     *         someAction(pageStore);
+     *     } finally {
+     *         checkpointProgress.unblockPartitionDestruction(partitionId);
+     *     }
+     * </code></pre>
+     *
+     * @param groupPartitionId Pair of group ID with partition ID.
+     * @see #unblockPartitionDestruction(GroupPartitionId)
+     */
+    public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
+        checkpointProgress.blockPartitionDestruction(groupPartitionId);
+    }
+
+    /**
+     * Unblocks physical destruction of partition.
+     *
+     * <p>As soon as the last thread makes an unlock, the physical destruction 
of the partition can immediately begin.</p>
+     *
+     * @param groupPartitionId Pair of group ID with partition ID.
+     * @see #blockPartitionDestruction(GroupPartitionId)
+     */
+    public void unblockPartitionDestruction(GroupPartitionId groupPartitionId) 
{
+        checkpointProgress.unblockPartitionDestruction(groupPartitionId);
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index c258f5ba11..f8a5c5abc9 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import 
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -76,6 +77,9 @@ class CheckpointProgressImpl implements CheckpointProgress {
     /** Partitions currently being processed, for example, writing dirty pages 
or doing fsync. */
     private final PartitionProcessingCounterMap processedPartitionMap = new 
PartitionProcessingCounterMap();
 
+    /** Assistant for synchronizing page replacement and fsync phase. */
+    private final CheckpointPageReplacement checkpointPageReplacement = new 
CheckpointPageReplacement();
+
     /**
      * Constructor.
      *
@@ -337,4 +341,58 @@ class CheckpointProgressImpl implements CheckpointProgress 
{
     public @Nullable CompletableFuture<Void> 
getUnblockPartitionDestructionFuture(GroupPartitionId groupPartitionId) {
         return 
processedPartitionMap.getProcessedPartitionFuture(groupPartitionId);
     }
+
+    /**
+     * Block the start of the fsync phase at a checkpoint before replacing the 
page.
+     *
+     * <p>It is expected that the method will be invoked once and after that 
the {@link #unblockFsyncOnPageReplacement} will be invoked on
+     * the same page.</p>
+     *
+     * <p>It is expected that the method will not be invoked after {@link 
#getUnblockFsyncOnPageReplacementFuture}, since by the start of
+     * the fsync phase, write dirty pages at the checkpoint should be complete 
and no new page replacements should be started.</p>
+     *
+     * @param pageId Page ID for which page replacement is expected to begin.
+     * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
+     * @see #getUnblockFsyncOnPageReplacementFuture()
+     */
+    void blockFsyncOnPageReplacement(FullPageId pageId) {
+        checkpointPageReplacement.block(pageId);
+    }
+
+    /**
+     * Unblocks the start of the fsync phase at a checkpoint after the page 
replacement is completed.
+     *
+     * <p>It is expected that the method will be invoked once and after the 
{@link #blockFsyncOnPageReplacement} for same page ID.</p>
+     *
+     * <p>The fsync phase will only be started after page replacement has been 
completed for all pages for which
+     * {@link #blockFsyncOnPageReplacement} was invoked before {@link 
#getUnblockFsyncOnPageReplacementFuture} was invoked, or no page
+     * replacement occurred at all.</p>
+     *
+     * <p>If an error occurs on any page replacement during one checkpoint, 
the future from {@link #getUnblockFsyncOnPageReplacementFuture}
+     * will complete with the first error.</p>
+     *
+     * <p>The method must be invoked even if any error occurred, so as not to 
hang a checkpoint.</p>
+     *
+     * @param pageId Page ID for which the page replacement has ended.
+     * @param error Error on page replacement, {@code null} if missing.
+     * @see #blockFsyncOnPageReplacement(FullPageId)
+     * @see #getUnblockFsyncOnPageReplacementFuture()
+     */
+    void unblockFsyncOnPageReplacement(FullPageId pageId, @Nullable Throwable 
error) {
+        checkpointPageReplacement.unblock(pageId, error);
+    }
+
+    /**
+     * Return future that will be completed successfully if all {@link 
#blockFsyncOnPageReplacement} are completed, either if there were
+     * none, or with an error from the first {@link 
#unblockFsyncOnPageReplacement}.
+     *
+     * <p>Must be invoked before the start of the fsync phase at the 
checkpoint and wait for the future to complete in order to safely
+     * perform the phase.</p>
+     *
+     * @see #blockFsyncOnPageReplacement(FullPageId)
+     * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
+     */
+    CompletableFuture<Void> getUnblockFsyncOnPageReplacementFuture() {
+        return checkpointPageReplacement.stopBlocking();
+    }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index ce5a4c9dbc..2f8415a544 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -42,7 +42,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -258,7 +257,7 @@ class CheckpointWorkflow {
             tracker.onMarkCheckpointBeginEnd();
 
             // Page replacement is allowed only after sorting dirty pages.
-            dirtyPages = beginCheckpoint(dataRegions, 
curr.futureFor(PAGES_SORTED));
+            dirtyPages = beginCheckpoint(curr);
 
             curr.currentCheckpointPagesCount(dirtyPages.dirtyPageCount);
 
@@ -370,10 +369,7 @@ class CheckpointWorkflow {
                 .collect(toUnmodifiableList());
     }
 
-    private DataRegionsDirtyPages beginCheckpoint(
-            Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
-            CompletableFuture<?> allowToReplace
-    ) {
+    private DataRegionsDirtyPages beginCheckpoint(CheckpointProgressImpl 
checkpointProgress) {
         assert checkpointReadWriteLock.isWriteLockHeldByCurrentThread();
 
         Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = 
this.dirtyPartitionsMap;
@@ -384,7 +380,7 @@ class CheckpointWorkflow {
 
         // First, we iterate all regions that have dirty pages.
         for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
-            Collection<FullPageId> dirtyPages = 
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
+            Collection<FullPageId> dirtyPages = 
dataRegion.pageMemory().beginCheckpoint(checkpointProgress);
 
             Set<FullPageId> dirtyMetaPageIds = 
dirtyPartitionsMap.remove(dataRegion);
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index b65a6660c0..9385ebf3bd 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -437,7 +437,7 @@ public class Checkpointer extends IgniteWorker {
      * @param shutdownNow Checker of stop operation.
      * @throws IgniteInternalCheckedException If failed.
      */
-    boolean writePages(
+    private boolean writePages(
             CheckpointMetricsTracker tracker,
             CheckpointDirtyPages checkpointDirtyPages,
             CheckpointProgressImpl currentCheckpointProgress,
@@ -484,8 +484,21 @@ public class Checkpointer extends IgniteWorker {
         // Wait and check for errors.
         CompletableFuture.allOf(futures).join();
 
-        // Must re-check shutdown flag here because threads may have skipped 
some pages.
-        // If so, we should not put finish checkpoint mark.
+        // Must re-check shutdown flag here because threads may have skipped 
some pages because of it.
+        // If so, we should not finish checkpoint.
+        if (shutdownNow.getAsBoolean()) {
+            currentCheckpointProgress.fail(new NodeStoppingException("Node is 
stopping."));
+
+            return false;
+        }
+
+        // Waiting for the completion of all page replacements if present.
+        // Will complete normally or with the first error on one of the page 
replacements.
+        // join() is used intentionally as above.
+        
currentCheckpointProgress.getUnblockFsyncOnPageReplacementFuture().join();
+
+        // Must re-check shutdown flag here because threads could take a long 
time to complete the page replacement.
+        // If so, we should not finish checkpoint.
         if (shutdownNow.getAsBoolean()) {
             currentCheckpointProgress.fail(new NodeStoppingException("Node is 
stopping."));
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
index 44af8c084d..84536da010 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
@@ -23,16 +23,26 @@ import static 
org.apache.ignite.internal.util.GridUnsafe.copyMemory;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
+import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Not thread safe and stateful class for page replacement of one page with 
write() delay. This allows to write page content without holding
- * segment lock. Page data is copied into temp buffer during {@link 
#write(PersistentPageMemory, FullPageId, ByteBuffer)} and then sent to
- * real implementation by {@link #finishReplacement}.
+ * Stateful class for page replacement of one page with write() delay. This 
allows to write page content without holding
+ * segment write lock, which allows for a long time not to block the segment 
read lock due to IO writes when reading pages or writing dirty
+ * pages at a checkpoint.
+ *
+ * <p>Usage:</p>
+ * <ul>
+ *     <li>On page replacement, invoke {@link #copyPageToTemporaryBuffer}.</li>
+ *     <li>After releasing the segment write lock, invoke {@link 
#flushCopiedPageIfExists}.</li>
+ * </ul>
+ *
+ * <p>Not thread safe.</p>
  */
-public class DelayedDirtyPageWrite implements WriteDirtyPage {
+public class DelayedDirtyPageWrite {
     /** Real flush dirty page implementation. */
     private final WriteDirtyPage flushDirtyPage;
 
@@ -45,21 +55,26 @@ public class DelayedDirtyPageWrite implements 
WriteDirtyPage {
     /** Replacing pages tracker, used to register & unregister pages being 
written. */
     private final DelayedPageReplacementTracker tracker;
 
-    /** Full page id to be written on {@link #finishReplacement} or {@code 
null} if nothing to write. */
+    /** Full page id to be written on {@link #flushCopiedPageIfExists}, {@code 
null} if nothing to write. */
     private @Nullable FullPageId fullPageId;
 
-    /** Page memory to be used in {@link #finishReplacement}. */
+    /** Page memory to be used in {@link #flushCopiedPageIfExists}, {@code 
null} if nothing to write. */
     private @Nullable PersistentPageMemory pageMemory;
 
+    /**
+     * Dirty pages of the segment that need to be written at the current 
checkpoint or page replacement, {@code null} if nothing to write.
+     */
+    private @Nullable CheckpointPages checkpointPages;
+
     /**
      * Constructor.
      *
-     * @param flushDirtyPage real writer to save page to store.
-     * @param byteBufThreadLoc thread local buffers to use for pages copying.
-     * @param pageSize page size.
-     * @param tracker tracker to lock/unlock page reads.
+     * @param flushDirtyPage Real writer to IO write page to store.
+     * @param byteBufThreadLoc Thread local buffers to use for pages copying.
+     * @param pageSize Page size in bytes.
+     * @param tracker Tracker to lock/unlock page reads.
      */
-    public DelayedDirtyPageWrite(
+    DelayedDirtyPageWrite(
             WriteDirtyPage flushDirtyPage,
             ThreadLocal<ByteBuffer> byteBufThreadLoc,
             // TODO: IGNITE-17017 Move to common config
@@ -72,41 +87,71 @@ public class DelayedDirtyPageWrite implements 
WriteDirtyPage {
         this.tracker = tracker;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void write(PersistentPageMemory pageMemory, FullPageId fullPageId, 
ByteBuffer byteBuf) {
-        tracker.lock(fullPageId);
+    /**
+     * Copies a page to a temporary buffer on page replacement.
+     *
+     * @param pageMemory Persistent page memory for subsequent page IO writes.
+     * @param pageId ID of the copied page.
+     * @param originPageBuf Buffer with the full contents of the page being 
copied (from which we will copy).
+     * @param checkpointPages Dirty pages of the segment that need to be 
written at the current checkpoint or page replacement.
+     * @see #flushCopiedPageIfExists()
+     */
+    public void copyPageToTemporaryBuffer(
+            PersistentPageMemory pageMemory,
+            FullPageId pageId,
+            ByteBuffer originPageBuf,
+            CheckpointPages checkpointPages
+    ) {
+        tracker.lock(pageId);
 
         ByteBuffer threadLocalBuf = byteBufThreadLoc.get();
 
         threadLocalBuf.rewind();
 
-        long writeAddr = bufferAddress(threadLocalBuf);
-        long origBufAddr = bufferAddress(byteBuf);
+        long dstBufAddr = bufferAddress(threadLocalBuf);
+        long srcBufAddr = bufferAddress(originPageBuf);
 
-        copyMemory(origBufAddr, writeAddr, pageSize);
+        copyMemory(srcBufAddr, dstBufAddr, pageSize);
 
-        this.fullPageId = fullPageId;
+        this.fullPageId = pageId;
         this.pageMemory = pageMemory;
+        this.checkpointPages = checkpointPages;
     }
 
     /**
-     * Runs actual write if required. Method is 'no op' if there was no page 
selected for replacement.
+     * Flushes a previously copied page to disk if it was copied.
      *
-     * @throws IgniteInternalCheckedException if write failed.
+     * @throws IgniteInternalCheckedException If write failed.
+     * @see #copyPageToTemporaryBuffer(PersistentPageMemory, FullPageId, 
ByteBuffer, CheckpointPages)
      */
-    public void finishReplacement() throws IgniteInternalCheckedException {
-        if (fullPageId == null && pageMemory == null) {
+    public void flushCopiedPageIfExists() throws 
IgniteInternalCheckedException {
+        if (fullPageId == null) {
             return;
         }
 
+        assert pageMemory != null : fullPageId;
+        assert checkpointPages != null : fullPageId;
+
+        Throwable errorOnWrite = null;
+
+        
checkpointPages.blockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+
         try {
             flushDirtyPage.write(pageMemory, fullPageId, 
byteBufThreadLoc.get());
+        } catch (Throwable t) {
+            errorOnWrite = t;
+
+            throw t;
         } finally {
+            
checkpointPages.unblockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+
+            checkpointPages.unblockFsyncOnPageReplacement(fullPageId, 
errorOnWrite);
+
             tracker.unlock(fullPageId);
 
             fullPageId = null;
             pageMemory = null;
+            checkpointPages = null;
         }
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
index 1ed08d89ba..a2f603cf8f 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
@@ -163,7 +163,7 @@ public class DelayedPageReplacementTracker {
 
                 boolean add = locked.add(id);
 
-                assert add : "Double locking of page for replacement is not 
possible";
+                assert add : "Double locking of page for replacement is not 
possible: " + id;
             }
         }
 
@@ -210,7 +210,7 @@ public class DelayedPageReplacementTracker {
             synchronized (locked) {
                 boolean rmv = locked.remove(id);
 
-                assert rmv : "Unlocking page ID never locked, id " + id;
+                assert rmv : "Unlocking page ID never locked, id: " + id;
 
                 if (locked.isEmpty()) {
                     hasLockedPages = false;
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacementTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacementTest.java
new file mode 100644
index 0000000000..9c778bc0cb
--- /dev/null
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacementTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.fullPageId;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.Test;
+
+/** For {@link CheckpointPageReplacement} testing. */
+public class CheckpointPageReplacementTest {
+    @Test
+    void testBlock() {
+        var checkpointPageReplacement = new CheckpointPageReplacement();
+
+        assertDoesNotThrow(() -> checkpointPageReplacement.block(fullPageId(0, 
0)));
+        assertDoesNotThrow(() -> checkpointPageReplacement.block(fullPageId(0, 
1)));
+
+        checkpointPageReplacement.stopBlocking();
+    }
+
+    @Test
+    void testUnblock() {
+        var checkpointPageReplacement = new CheckpointPageReplacement();
+
+        checkpointPageReplacement.block(fullPageId(0, 0));
+        checkpointPageReplacement.block(fullPageId(0, 1));
+        checkpointPageReplacement.block(fullPageId(0, 2));
+        checkpointPageReplacement.block(fullPageId(0, 3));
+        checkpointPageReplacement.block(fullPageId(0, 4));
+        checkpointPageReplacement.block(fullPageId(0, 5));
+
+        assertDoesNotThrow(() -> 
checkpointPageReplacement.unblock(fullPageId(0, 0), null));
+        assertDoesNotThrow(() -> 
checkpointPageReplacement.unblock(fullPageId(0, 1), new Throwable("from test 
0")));
+        assertDoesNotThrow(() -> 
checkpointPageReplacement.unblock(fullPageId(0, 2), new Throwable("from test 
1")));
+
+        checkpointPageReplacement.stopBlocking();
+
+        assertDoesNotThrow(() -> 
checkpointPageReplacement.unblock(fullPageId(0, 3), null));
+        assertDoesNotThrow(() -> 
checkpointPageReplacement.unblock(fullPageId(0, 4), new Throwable("from test 
0")));
+        assertDoesNotThrow(() -> 
checkpointPageReplacement.unblock(fullPageId(0, 5), new Throwable("from test 
1")));
+    }
+
+    @Test
+    void testStopBlocking() {
+        var checkpointPageReplacement = new CheckpointPageReplacement();
+
+        checkpointPageReplacement.block(fullPageId(0, 0));
+        checkpointPageReplacement.block(fullPageId(0, 1));
+
+        CompletableFuture<Void> stopBlockingFuture = 
checkpointPageReplacement.stopBlocking();
+        assertFalse(stopBlockingFuture.isDone());
+
+        checkpointPageReplacement.unblock(fullPageId(0, 0), null);
+        assertFalse(stopBlockingFuture.isDone());
+
+        checkpointPageReplacement.unblock(fullPageId(0, 1), null);
+        assertTrue(stopBlockingFuture.isDone());
+
+        assertTrue(checkpointPageReplacement.stopBlocking().isDone());
+    }
+
+    @Test
+    void testStopBlockingError() {
+        var checkpointPageReplacement = new CheckpointPageReplacement();
+
+        checkpointPageReplacement.block(fullPageId(0, 0));
+        checkpointPageReplacement.block(fullPageId(0, 1));
+
+        CompletableFuture<Void> stopBlockingFuture = 
checkpointPageReplacement.stopBlocking();
+        assertFalse(stopBlockingFuture.isDone());
+
+        checkpointPageReplacement.unblock(fullPageId(0, 0), new 
RuntimeException("from test 1"));
+        assertThat(stopBlockingFuture, willThrow(RuntimeException.class, "from 
test 1"));
+
+        checkpointPageReplacement.unblock(fullPageId(0, 1), new 
TimeoutException("from test 2"));
+        assertThat(stopBlockingFuture, willThrow(RuntimeException.class, "from 
test 1"));
+
+        assertThat(checkpointPageReplacement.stopBlocking(), 
willThrow(RuntimeException.class, "from test 1"));
+    }
+
+    @Test
+    void testStopBlockingNoPageReplacement() {
+        assertTrue(new CheckpointPageReplacement().stopBlocking().isDone());
+    }
+}
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
index 0a41d30b81..38b742d58f 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
@@ -17,34 +17,29 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.fullPageId;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Collections;
 import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.junit.jupiter.api.Test;
 
-/**
- * For {@link CheckpointPages} testing.
- */
+/** For {@link CheckpointPages} testing. */
 public class CheckpointPagesTest {
     @Test
     void testContains() {
-        CheckpointPages checkpointPages = new CheckpointPages(
-                Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
-                nullCompletedFuture()
-        );
+        CheckpointPages checkpointPages = createCheckpointPages(new 
FullPageId(0, 0), new FullPageId(1, 0));
 
         assertTrue(checkpointPages.contains(new FullPageId(0, 0)));
         assertTrue(checkpointPages.contains(new FullPageId(1, 0)));
@@ -55,65 +50,84 @@ public class CheckpointPagesTest {
 
     @Test
     void testSize() {
-        CheckpointPages checkpointPages = new CheckpointPages(
-                Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
-                nullCompletedFuture()
-        );
+        CheckpointPages checkpointPages = createCheckpointPages(fullPageId(0, 
0), fullPageId(1, 0));
 
         assertEquals(2, checkpointPages.size());
     }
 
     @Test
-    void testMarkAsSaved() {
-        CheckpointPages checkpointPages = new CheckpointPages(
-                new HashSet<>(Set.of(new FullPageId(0, 0), new FullPageId(1, 
0), new FullPageId(2, 0))),
-                nullCompletedFuture()
-        );
+    void testRemoveOnCheckpoint() {
+        CheckpointPages checkpointPages = createCheckpointPages(fullPageId(0, 
0), fullPageId(1, 0), fullPageId(2, 0));
 
-        assertTrue(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+        assertTrue(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
         assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
         assertEquals(2, checkpointPages.size());
 
-        assertFalse(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+        assertFalse(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
         assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
         assertEquals(2, checkpointPages.size());
 
-        assertTrue(checkpointPages.markAsSaved(new FullPageId(1, 0)));
+        assertTrue(checkpointPages.removeOnCheckpoint(fullPageId(1, 0)));
         assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
         assertEquals(1, checkpointPages.size());
     }
 
     @Test
-    void testAllowToSave() throws Exception {
-        Set<FullPageId> pages = Set.of(new FullPageId(0, 0), new FullPageId(1, 
0), new FullPageId(2, 0));
-
-        CheckpointPages checkpointPages = new CheckpointPages(pages, 
nullCompletedFuture());
+    void testRemoveOnPageReplacement() throws Exception {
+        var checkpointProgress = new CheckpointProgressImpl(10);
 
-        assertTrue(checkpointPages.allowToSave(new FullPageId(0, 0)));
-        assertTrue(checkpointPages.allowToSave(new FullPageId(1, 0)));
-        assertTrue(checkpointPages.allowToSave(new FullPageId(2, 0)));
+        CheckpointPages checkpointPages = 
createCheckpointPages(checkpointProgress, fullPageId(0, 0), fullPageId(1, 0));
 
-        assertFalse(checkpointPages.allowToSave(new FullPageId(3, 0)));
+        // Let's make sure that the check will not complete until the dirty 
page sorting phase completes.
+        checkpointProgress.transitTo(LOCK_RELEASED);
 
-        IgniteInternalCheckedException exception = assertThrows(
-                IgniteInternalCheckedException.class,
-                () -> new CheckpointPages(pages, failedFuture(new 
Exception("test"))).allowToSave(new FullPageId(0, 0))
+        CompletableFuture<Boolean> removeOnPageReplacementFuture = runAsync(
+                () -> checkpointPages.removeOnPageReplacement(fullPageId(0, 0))
         );
+        assertThat(removeOnPageReplacementFuture, willTimeoutFast());
 
-        assertThat(exception.getCause(), instanceOf(Exception.class));
-        assertThat(exception.getCause().getMessage(), equalTo("test"));
+        checkpointProgress.transitTo(PAGES_SORTED);
+        assertThat(removeOnPageReplacementFuture, willBe(true));
+        assertFalse(checkpointPages.contains(fullPageId(0, 0)));
+        assertEquals(1, checkpointPages.size());
 
-        exception = assertThrows(
-                IgniteInternalCheckedException.class,
-                () -> {
-                    CompletableFuture<Object> future = new 
CompletableFuture<>();
+        assertFalse(checkpointPages.removeOnPageReplacement(fullPageId(0, 0)));
+        assertFalse(checkpointPages.contains(fullPageId(0, 0)));
+        assertEquals(1, checkpointPages.size());
 
-                    future.cancel(true);
+        assertTrue(checkpointPages.removeOnPageReplacement(fullPageId(1, 0)));
+        assertFalse(checkpointPages.contains(fullPageId(1, 0)));
+        assertEquals(0, checkpointPages.size());
+    }
+
+    @Test
+    void testRemoveOnPageReplacementErrorOnWaitPageSortingPhase() {
+        var checkpointProgress = new CheckpointProgressImpl(10);
 
-                    new CheckpointPages(pages, future).allowToSave(new 
FullPageId(0, 0));
-                }
+        CheckpointPages checkpointPages = 
createCheckpointPages(checkpointProgress);
+
+        checkpointProgress.fail(new Exception("from test"));
+
+        assertThrows(
+                Exception.class,
+                () -> checkpointPages.removeOnPageReplacement(fullPageId(0, 
0)),
+                "from test"
         );
+    }
+
+    private static CheckpointPages createCheckpointPages(FullPageId... 
pageIds) {
+        var checkpointProgress = new CheckpointProgressImpl(10);
+
+        checkpointProgress.transitTo(PAGES_SORTED);
+
+        return createCheckpointPages(checkpointProgress, pageIds);
+    }
+
+    private static CheckpointPages 
createCheckpointPages(CheckpointProgressImpl checkpointProgress, FullPageId... 
pageIds) {
+        var set = new HashSet<FullPageId>(pageIds.length);
+
+        Collections.addAll(set, pageIds);
 
-        assertThat(exception.getCause(), 
instanceOf(CancellationException.class));
+        return new CheckpointPages(set, checkpointProgress);
     }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 9e45b41e4b..2c6c00ccc0 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -665,7 +665,7 @@ public class CheckpointWorkflowTest extends 
BaseIgniteAbstractTest {
     private static PersistentPageMemory newPageMemory(Collection<FullPageId> 
pageIds) {
         PersistentPageMemory mock = mock(PersistentPageMemory.class);
 
-        
when(mock.beginCheckpoint(any(CompletableFuture.class))).thenReturn(pageIds);
+        
when(mock.beginCheckpoint(any(CheckpointProgress.class))).thenReturn(pageIds);
 
         return mock;
     }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
index 217e19b01f..0d4764f9d0 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
 import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 
 import java.util.Arrays;
 import org.apache.ignite.internal.pagememory.FullPageId;
@@ -37,4 +39,14 @@ class TestCheckpointUtils {
                 
Arrays.stream(dirtyPages).map(GroupPartitionId::convert).collect(toSet())
         );
     }
+
+    /**
+     * Creates new full page ID.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    static FullPageId fullPageId(int groupId, int partitionId) {
+        return new FullPageId(pageId(partitionId, FLAG_DATA, 0), groupId);
+    }
 }
diff --git 
a/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
 
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
index 1ed535d7cd..af9a9a9607 100644
--- 
a/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
+++ 
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.pagememory;
 
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
 
 import com.google.auto.service.AutoService;
 import java.util.Collection;
@@ -26,6 +28,7 @@ import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
 import org.apache.ignite.internal.pagememory.io.PageIo;
 import org.apache.ignite.internal.pagememory.io.PageIoModule;
+import 
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FakePartitionMetaIo;
 
 /**
  * Test implementation of {@link PageIoModule}.
@@ -35,13 +38,18 @@ public class TestPageIoModule implements PageIoModule {
     /** Last possible value for IO type. */
     public static final int TEST_PAGE_TYPE = PageIo.MAX_IO_TYPE;
 
+    /** {@link FakePartitionMetaIo} type. */
+    public static final int FAKE_PARTITION_META_PAGE_IO_TYPE = TEST_PAGE_TYPE 
- 1;
+
+    /** {@link TestSimpleValuePageIo} type.*/
+    public static final int TEST_SIMPLE_VALUE_PAGE_IO_TYPE = 
FAKE_PARTITION_META_PAGE_IO_TYPE - 1;
+
     /** Version 1, minimal possible value. */
     public static final int TEST_PAGE_VER = 1;
 
-    /** {@inheritDoc} */
     @Override
     public Collection<IoVersions<?>> ioVersions() {
-        return List.of(new IoVersions<>(new TestPageIo()));
+        return List.of(new IoVersions<>(new TestPageIo()), 
FakePartitionMetaIo.VERSIONS, new IoVersions<>(new TestSimpleValuePageIo()));
     }
 
     /**
@@ -55,9 +63,39 @@ public class TestPageIoModule implements PageIoModule {
             super(TEST_PAGE_TYPE, TEST_PAGE_VER, FLAG_DATA);
         }
 
-        /** {@inheritDoc} */
         @Override
         protected void printPage(long addr, int pageSize, IgniteStringBuilder 
sb) {
         }
     }
+
+    /** Test implementation of {@link PageIo} with simple value (long). */
+    public static class TestSimpleValuePageIo extends PageIo {
+        private static final int LONG_VALUE_OFF = PageIo.COMMON_HEADER_END;
+
+        /** Constructor. */
+        public TestSimpleValuePageIo() {
+            super(TEST_SIMPLE_VALUE_PAGE_IO_TYPE, TEST_PAGE_VER, FLAG_DATA);
+        }
+
+        @Override
+        public void initNewPage(long pageAddr, long pageId, int pageSize) {
+            super.initNewPage(pageAddr, pageId, pageSize);
+
+            setLongValue(pageAddr, 0);
+        }
+
+        @Override
+        protected void printPage(long addr, int pageSize, IgniteStringBuilder 
sb) {
+        }
+
+        /** Reads long value. */
+        public static long getLongValue(long pageAddr) {
+            return getLong(pageAddr, LONG_VALUE_OFF);
+        }
+
+        /** Writes long value. */
+        public static void setLongValue(long pageAddr, long value) {
+            putLong(pageAddr, LONG_VALUE_OFF, value);
+        }
+    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
 
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
similarity index 69%
rename from 
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
rename to 
modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
index a4f8b736c2..0a9f695d7a 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
+++ 
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
@@ -17,24 +17,27 @@
 
 package org.apache.ignite.internal.pagememory.persistence;
 
+import static 
org.apache.ignite.internal.pagememory.TestPageIoModule.FAKE_PARTITION_META_PAGE_IO_TYPE;
+import static 
org.apache.ignite.internal.pagememory.TestPageIoModule.TEST_PAGE_VER;
+
 import java.util.UUID;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.pagememory.io.IoVersions;
 import org.apache.ignite.internal.pagememory.persistence.io.PartitionMetaIo;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * Simple implementation of {@link PartitionMeta} for testing purposes.
- */
+/** Simple implementation of {@link PartitionMeta} for testing purposes. */
 public class FakePartitionMeta extends PartitionMeta {
+    public static final FakePartitionMetaFactory FACTORY = new 
FakePartitionMetaFactory();
 
-    public static final PartitionMetaFactory FACTORY = new 
FakePartitionMetaFactory();
-
-    /**
-     * Constructor.
-     */
+    /** Constructor. */
     public FakePartitionMeta() {
-        super(0);
+        this(0);
+    }
+
+    /** Constructor. */
+    public FakePartitionMeta(int pageCount) {
+        super(pageCount);
     }
 
     /**
@@ -51,7 +54,7 @@ public class FakePartitionMeta extends PartitionMeta {
 
     @Override
     protected FakePartitionMetaSnapshot buildSnapshot(@Nullable UUID 
checkpointId) {
-        return new FakePartitionMetaSnapshot(checkpointId);
+        return new FakePartitionMetaSnapshot(checkpointId, pageCount());
     }
 
     /**
@@ -60,12 +63,17 @@ public class FakePartitionMeta extends PartitionMeta {
     public static class FakePartitionMetaSnapshot implements 
PartitionMetaSnapshot {
         private final UUID checkpointId;
 
-        public FakePartitionMetaSnapshot(@Nullable UUID checkpointId) {
+        private final int pageCount;
+
+        FakePartitionMetaSnapshot(@Nullable UUID checkpointId, int pageCount) {
             this.checkpointId = checkpointId;
+            this.pageCount = pageCount;
         }
 
         @Override
         public void writeTo(PartitionMetaIo metaIo, long pageAddr) {
+            FakePartitionMetaIo fakePartitionMetaIo = (FakePartitionMetaIo) 
metaIo;
+            fakePartitionMetaIo.setPageCount(pageAddr, pageCount);
         }
 
         @Override
@@ -79,14 +87,16 @@ public class FakePartitionMeta extends PartitionMeta {
      */
     public static class FakePartitionMetaIo extends PartitionMetaIo {
         /** I/O versions. */
-        public static final IoVersions<FakePartitionMetaIo> VERSIONS = new 
IoVersions<>(new FakePartitionMetaIo(1, 1));
+        public static final IoVersions<FakePartitionMetaIo> VERSIONS = new 
IoVersions<>(
+                new FakePartitionMetaIo(FAKE_PARTITION_META_PAGE_IO_TYPE, 
TEST_PAGE_VER)
+        );
 
         /**
          * Constructor.
          *
          * @param ver Page format version.
          */
-        protected FakePartitionMetaIo(int type, int ver) {
+        FakePartitionMetaIo(int type, int ver) {
             super(type, ver);
         }
 
@@ -98,13 +108,11 @@ public class FakePartitionMeta extends PartitionMeta {
         }
     }
 
-    /**
-     * Simple implementation of {@link PartitionMetaFactory} for testing 
purposes.
-     */
+    /** Simple implementation of {@link PartitionMetaFactory} for testing 
purposes. */
     public static class FakePartitionMetaFactory implements 
PartitionMetaFactory {
         @Override
-        public FakePartitionMeta createPartitionMeta(UUID checkpointId, 
PartitionMetaIo metaIo, long pageAddr) {
-            return new FakePartitionMeta().init(checkpointId);
+        public FakePartitionMeta createPartitionMeta(@Nullable UUID 
checkpointId, PartitionMetaIo metaIo, long pageAddr) {
+            return new 
FakePartitionMeta(metaIo.getPageCount(pageAddr)).init(checkpointId);
         }
 
         @Override


Reply via email to