This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9446e8591a IGNITE-23212 Fix the situation when page replacement occurs
after the completion of the checkpoint (#4437)
9446e8591a is described below
commit 9446e8591a24dfa44d866483ff2a7383cac99118
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Sep 25 16:35:49 2024 +0300
IGNITE-23212 Fix the situation when page replacement occurs after the
completion of the checkpoint (#4437)
---
modules/page-memory/build.gradle | 3 +
.../tree/persistence/ItPageReplacementTest.java | 414 +++++++++++++++++++++
.../pagememory/persistence/PageStoreWriter.java | 12 +-
.../persistence/PersistentPageMemory.java | 102 +++--
.../persistence/checkpoint/CheckpointManager.java | 1 -
.../checkpoint/CheckpointPageReplacement.java | 138 +++++++
.../persistence/checkpoint/CheckpointPages.java | 161 ++++++--
.../checkpoint/CheckpointProgressImpl.java | 58 +++
.../persistence/checkpoint/CheckpointWorkflow.java | 10 +-
.../persistence/checkpoint/Checkpointer.java | 19 +-
.../replacement/DelayedDirtyPageWrite.java | 91 +++--
.../replacement/DelayedPageReplacementTracker.java | 4 +-
.../checkpoint/CheckpointPageReplacementTest.java | 107 ++++++
.../checkpoint/CheckpointPagesTest.java | 110 +++---
.../checkpoint/CheckpointWorkflowTest.java | 2 +-
.../checkpoint/TestCheckpointUtils.java | 12 +
.../internal/pagememory/TestPageIoModule.java | 44 ++-
.../pagememory/persistence/FakePartitionMeta.java | 44 ++-
18 files changed, 1160 insertions(+), 172 deletions(-)
diff --git a/modules/page-memory/build.gradle b/modules/page-memory/build.gradle
index 66dc53d07a..329644ce4e 100644
--- a/modules/page-memory/build.gradle
+++ b/modules/page-memory/build.gradle
@@ -54,8 +54,11 @@ dependencies {
testFixturesImplementation(testFixtures(project(':ignite-core')))
testFixturesImplementation libs.mockito.core
testFixturesImplementation libs.auto.service.annotations
+ testFixturesImplementation libs.jetbrains.annotations
integrationTestImplementation project(':ignite-storage-api')
+ integrationTestImplementation project(':ignite-failure-handler')
+ integrationTestImplementation project(':ignite-file-io')
integrationTestImplementation(testFixtures(project))
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java
new file mode 100644
index 0000000000..4d38230447
--- /dev/null
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItPageReplacementTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.tree.persistence;
+
+import static
org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
+import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.components.LogSyncer;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.pagememory.DataRegion;
+import
org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo;
+import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
+import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
+import
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
+import
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
+import org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import
org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Integration tests for testing page replacement. */
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+public class ItPageReplacementTest extends BaseIgniteAbstractTest {
+ private static final String NODE_NAME = "test";
+
+ private static final int GROUP_ID = 1;
+
+ private static final int PARTITION_ID = 0;
+
+ private static final int PARTITION_COUNT = 1;
+
+ private static final int PAGE_SIZE = 512;
+
+ private static final int PAGE_COUNT = 1024;
+
+ private static final int MAX_MEMORY_SIZE = PAGE_COUNT * PAGE_SIZE;
+
+ private static final int CPUS = Math.min(4,
Runtime.getRuntime().availableProcessors());
+
+ @WorkDirectory
+ private Path workDir;
+
+ @InjectConfiguration("mock.checkpointThreads = 1")
+ private PageMemoryCheckpointConfiguration checkpointConfig;
+
+ @InjectConfiguration(
+ polymorphicExtensions =
PersistentPageMemoryProfileConfigurationSchema.class,
+ value = "mock = {"
+ + "engine=aipersist, "
+ + "size=" + MAX_MEMORY_SIZE
+ + "}"
+ )
+ private StorageProfileConfiguration storageProfileCfg;
+
+ private FilePageStoreManager filePageStoreManager;
+
+ private PartitionMetaManager partitionMetaManager;
+
+ private CheckpointManager checkpointManager;
+
+ private PersistentPageMemory pageMemory;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ FailureManager failureManager = mock(FailureManager.class);
+
+ var ioRegistry = new TestPageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ filePageStoreManager = new FilePageStoreManager(
+ NODE_NAME,
+ workDir,
+ new RandomAccessFileIoFactory(),
+ PAGE_SIZE,
+ failureManager
+ );
+
+ partitionMetaManager = new PartitionMetaManager(ioRegistry, PAGE_SIZE,
FakePartitionMeta.FACTORY);
+
+ var dataRegionList = new ArrayList<DataRegion<PersistentPageMemory>>();
+
+ checkpointManager = new CheckpointManager(
+ NODE_NAME,
+ null,
+ null,
+ failureManager,
+ checkpointConfig,
+ filePageStoreManager,
+ partitionMetaManager,
+ dataRegionList,
+ ioRegistry,
+ mock(LogSyncer.class),
+ PAGE_SIZE
+ );
+
+ pageMemory = new PersistentPageMemory(
+ (PersistentPageMemoryProfileConfiguration)
fixConfiguration(storageProfileCfg),
+ ioRegistry,
+ new long[]{MAX_MEMORY_SIZE},
+ 10 * MiB,
+ filePageStoreManager,
+ null,
+ (pageMemory0, fullPageId, buf) ->
checkpointManager.writePageToDeltaFilePageStore(pageMemory0, fullPageId, buf,
true),
+ checkpointManager.checkpointTimeoutLock(),
+ PAGE_SIZE
+ );
+
+ dataRegionList.add(() -> pageMemory);
+
+ filePageStoreManager.start();
+ checkpointManager.start();
+ pageMemory.start();
+
+ createPartitionFilePageStoresIfMissing();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ checkpointManager == null ? null : checkpointManager::stop,
+ pageMemory == null ? null : () -> pageMemory.stop(true),
+ filePageStoreManager == null ? null :
filePageStoreManager::stop
+ );
+ }
+
+ /** Checks that page replacement will occur after the start of the
checkpoint and before its end. */
+ @Test
+ void testPageReplacement() throws Throwable {
+ var startWritePagesOnCheckpointFuture = new CompletableFuture<Void>();
+ var continueWritePagesOnCheckpointFuture = new
CompletableFuture<Void>();
+
+ var startWritePagesOnPageReplacementFuture = new
CompletableFuture<Void>();
+
+ CheckpointProgress checkpointProgress = inCheckpointReadLock(() -> {
+ // We make only one dirty page for the checkpoint to work.
+ createAndFillTestSimpleValuePages(1);
+
+ FilePageStore filePageStore = filePageStoreManager.getStore(new
GroupPartitionId(GROUP_ID, PARTITION_ID));
+
+ // First time the method should be invoked by the checkpoint
writer, let's hold it for page replacement.
+ doAnswer(invocation -> {
+ startWritePagesOnCheckpointFuture.complete(null);
+
+ assertThat(continueWritePagesOnCheckpointFuture,
willCompleteSuccessfully());
+
+ return invocation.callRealMethod();
+ }).doAnswer(invocation -> {
+ // Second time the method should be invoked on page
replacement.
+ startWritePagesOnPageReplacementFuture.complete(null);
+
+ return invocation.callRealMethod();
+ }).when(filePageStore).getOrCreateNewDeltaFile(any(), any());
+
+ // Trigger checkpoint so that it writes a meta page and one dirty
one. We do it under a read lock to ensure that the background
+ // does not start after the lock is released.
+ return checkpointManager.forceCheckpoint("for test");
+ });
+
+ CompletableFuture<Void> finishCheckpointFuture =
checkpointProgress.futureFor(FINISHED);
+
+ // Let's wait for the checkpoint writer to start writing the first
page.
+ assertThat(startWritePagesOnCheckpointFuture,
willCompleteSuccessfully());
+ // Let's make sure that no one has tried to write another page.
+ assertFalse(startWritePagesOnPageReplacementFuture.isDone());
+
+ // We will create dirty pages until the page replacement occurs.
+ // Asynchronously so as not to get into dead locks or something like
that.
+ assertThat(
+ runAsync(() -> inCheckpointReadLock(
+ () -> createAndFillTestSimpleValuePages(() ->
!startWritePagesOnPageReplacementFuture.isDone())
+ )),
+ willCompleteSuccessfully()
+ );
+ assertFalse(finishCheckpointFuture.isDone());
+
+ continueWritePagesOnCheckpointFuture.complete(null);
+ assertThat(finishCheckpointFuture, willCompleteSuccessfully());
+ assertTrue(pageMemory.pageReplacementOccurred());
+ }
+
+ @Test
+ void
testFsyncDeltaFilesWillNotStartOnCheckpointUntilPageReplacementIsComplete()
throws Exception {
+ var startWritePagesOnCheckpointFuture = new CompletableFuture<Void>();
+ var continueWritePagesOnCheckpointFuture = new
CompletableFuture<Void>();
+
+ var startWritePagesOnPageReplacementFuture = new
CompletableFuture<Void>();
+ var continueWritePagesOnPageReplacementFuture = new
CompletableFuture<Void>();
+
+ var deltaFileIoFuture = new CompletableFuture<DeltaFilePageStoreIo>();
+
+ CheckpointProgress checkpointProgress = inCheckpointReadLock(() -> {
+ // We make only one dirty page for the checkpoint to work.
+ createAndFillTestSimpleValuePages(1);
+
+ FilePageStore filePageStore = filePageStoreManager.getStore(new
GroupPartitionId(GROUP_ID, PARTITION_ID));
+
+ // First time the method should be invoked by the checkpoint
writer, let's hold it for page replacement.
+ doAnswer(invocation -> {
+ CompletableFuture<DeltaFilePageStoreIo> callRealMethodResult =
+ (CompletableFuture<DeltaFilePageStoreIo>)
invocation.callRealMethod();
+
+ callRealMethodResult = callRealMethodResult
+ .handle((deltaFilePageStoreIo, throwable) -> {
+ if (throwable != null) {
+
deltaFileIoFuture.completeExceptionally(throwable);
+
+ throw new CompletionException(throwable);
+ } else {
+ deltaFilePageStoreIo =
spy(deltaFilePageStoreIo);
+
+
deltaFileIoFuture.complete(deltaFilePageStoreIo);
+
+ return deltaFilePageStoreIo;
+ }
+ });
+
+ startWritePagesOnCheckpointFuture.complete(null);
+
+ assertThat(continueWritePagesOnCheckpointFuture,
willCompleteSuccessfully());
+
+ return callRealMethodResult;
+ }).doAnswer(invocation -> {
+ // Second time the method should be invoked on page
replacement, let's hold it.
+ startWritePagesOnPageReplacementFuture.complete(null);
+
+ assertThat(continueWritePagesOnPageReplacementFuture,
willCompleteSuccessfully());
+
+ return invocation.callRealMethod();
+ }).when(filePageStore).getOrCreateNewDeltaFile(any(), any());
+
+ doReturn(deltaFileIoFuture).when(filePageStore).getNewDeltaFile();
+
+ // Trigger checkpoint so that it writes a meta page and one dirty
one. We do it under a read lock to ensure that the background
+ // does not start after the lock is released.
+ return checkpointManager.forceCheckpoint("for test");
+ });
+
+ CompletableFuture<Void> finishCheckpointFuture =
checkpointProgress.futureFor(FINISHED);
+
+ // Let's wait for the checkpoint writer to start writing the first
page.
+ assertThat(startWritePagesOnCheckpointFuture,
willCompleteSuccessfully());
+ assertThat(deltaFileIoFuture, willCompleteSuccessfully());
+
+ // We will create dirty pages until the page replacement occurs.
+ // Asynchronously so as not to get into dead locks or something like
that.
+
+ CompletableFuture<Void> createPagesForPageReplacementFuture = runAsync(
+ () -> inCheckpointReadLock(() ->
createAndFillTestSimpleValuePages(() ->
!startWritePagesOnPageReplacementFuture.isDone()))
+ );
+ assertThat(startWritePagesOnPageReplacementFuture,
willCompleteSuccessfully());
+ assertFalse(createPagesForPageReplacementFuture.isDone());
+
+ // Let's release the checkpoint and make sure that it does not
complete and fsync does not occur until the page replacement is
+ // complete.
+ continueWritePagesOnCheckpointFuture.complete(null);
+ assertThat(finishCheckpointFuture, willTimeoutFast());
+ // 250 by analogy with willTimeoutFast().
+ verify(deltaFileIoFuture.join(), timeout(250).times(0)).sync();
+ assertFalse(createPagesForPageReplacementFuture.isDone());
+
+ // Let's release the page replacement and make sure everything ends
well.
+ continueWritePagesOnPageReplacementFuture.complete(null);
+
+ assertThat(finishCheckpointFuture, willCompleteSuccessfully());
+ assertThat(createPagesForPageReplacementFuture,
willCompleteSuccessfully());
+ verify(deltaFileIoFuture.join()).sync();
+ }
+
+ private void createAndFillTestSimpleValuePage(long pageId) throws
Exception {
+ long page = pageMemory.acquirePage(GROUP_ID, pageId);
+
+ try {
+ long pageAddr = pageMemory.writeLock(GROUP_ID, pageId, page);
+
+ try {
+ new TestSimpleValuePageIo().initNewPage(pageAddr, pageId,
PAGE_SIZE);
+
+ TestSimpleValuePageIo.setLongValue(pageAddr, pageIndex(pageId)
* 3L);
+ } finally {
+ pageMemory.writeUnlock(GROUP_ID, pageId, page, true);
+ }
+ } finally {
+ pageMemory.releasePage(GROUP_ID, pageId, page);
+ }
+ }
+
+ private void createPartitionFilePageStoresIfMissing() throws Exception {
+ ByteBuffer buffer = allocateBuffer(PAGE_SIZE);
+
+ try {
+ for (int partitionId = 0; partitionId < PARTITION_COUNT;
partitionId++) {
+ var groupPartitionId = new GroupPartitionId(GROUP_ID,
partitionId);
+
+ FilePageStore filePageStore =
filePageStoreManager.readOrCreateStore(groupPartitionId, buffer.rewind());
+
+ filePageStore.ensure();
+
+ PartitionMeta partitionMeta =
partitionMetaManager.readOrCreateMeta(
+ null,
+ groupPartitionId,
+ filePageStore,
+ buffer.rewind()
+ );
+
+ filePageStore.pages(partitionMeta.pageCount());
+
+ filePageStore.setPageAllocationListener(pageIdx -> {
+ assert
checkpointManager.checkpointTimeoutLock().checkpointLockIsHeldByThread();
+
+ CheckpointProgress checkpointProgress =
checkpointManager.lastCheckpointProgress();
+
+ partitionMeta.incrementPageCount(checkpointProgress ==
null ? null : checkpointProgress.id());
+ });
+
+ filePageStoreManager.addStore(groupPartitionId,
spy(filePageStore));
+ partitionMetaManager.addMeta(groupPartitionId, partitionMeta);
+ }
+ } finally {
+ freeBuffer(buffer);
+ }
+ }
+
+ private <V> V inCheckpointReadLock(Callable<V> callable) throws Exception {
+ checkpointManager.checkpointTimeoutLock().checkpointReadLock();
+
+ try {
+ return callable.call();
+ } finally {
+ checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+ }
+
+ private void inCheckpointReadLock(RunnableX runnableX) throws Throwable {
+ checkpointManager.checkpointTimeoutLock().checkpointReadLock();
+
+ try {
+ runnableX.run();
+ } finally {
+ checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+ }
+
+ private void createAndFillTestSimpleValuePages(int pageCount) throws
Exception {
+ for (int i = 0; i < pageCount; i++) {
+ createAndFillTestSimpleValuePage(pageMemory.allocatePage(null,
GROUP_ID, PARTITION_ID, FLAG_DATA));
+ }
+ }
+
+ private void createAndFillTestSimpleValuePages(BooleanSupplier
continuePredicate) throws Exception {
+ while (continuePredicate.getAsBoolean()) {
+ createAndFillTestSimpleValuePage(pageMemory.allocatePage(null,
GROUP_ID, PARTITION_ID, FLAG_DATA));
+ }
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
index 54783ce690..1c43ad6675 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
@@ -18,22 +18,20 @@
package org.apache.ignite.internal.pagememory.persistence;
import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
-/**
- * Interface for write page to {@link PageStore}.
- */
+/** Interface for writing dirty pages to {@link PageStore} on checkpoint. */
public interface PageStoreWriter {
/**
- * Callback for write page. {@link PersistentPageMemory} will copy page
content to buffer before call.
+ * Writes page to {@link PageStore}. {@link PersistentPageMemory} will
copy page content to buffer before call.
*
* @param fullPageId Page ID to get byte buffer for. The page ID must be
present in the collection returned by the {@link
- * PersistentPageMemory#beginCheckpoint(CompletableFuture)} method call.
+ * PersistentPageMemory#beginCheckpoint} method call.
* @param buf Temporary buffer to write changes into.
- * @param tag {@code Partition generation} if data was read.
+ * @param tag {@code Partition generation} if data was read. {@link
PersistentPageMemory#TRY_AGAIN_TAG} if failed to get a write lock
+ * for a page and need to try writing again later.
* @throws IgniteInternalCheckedException If write page failed.
*/
void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws
IgniteInternalCheckedException;
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 2454435dcc..8f4e2ec164 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -68,7 +68,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,8 +92,10 @@ import
org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointMetricsTracker;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
import
org.apache.ignite.internal.pagememory.persistence.replacement.ClockPageReplacementPolicyFactory;
+import
org.apache.ignite.internal.pagememory.persistence.replacement.DelayedDirtyPageWrite;
import
org.apache.ignite.internal.pagememory.persistence.replacement.DelayedPageReplacementTracker;
import
org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicy;
import
org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicyFactory;
@@ -509,7 +510,6 @@ public class PersistentPageMemory implements PageMemory {
return dirty(absPtr);
}
- /** {@inheritDoc} */
@Override
public long allocatePageNoReuse(int grpId, int partId, byte flags) throws
IgniteInternalCheckedException {
assert partId >= 0 && partId <= MAX_PARTITION_ID : partId;
@@ -526,7 +526,6 @@ public class PersistentPageMemory implements PageMemory {
seg.writeLock().lock();
-
try {
FullPageId fullId = new FullPageId(pageId, grpId);
@@ -587,10 +586,9 @@ public class PersistentPageMemory implements PageMemory {
throw e;
} finally {
seg.writeLock().unlock();
- }
- // Finish replacement only when an exception wasn't thrown otherwise
it possible to corrupt B+Tree.
- delayedPageReplacementTracker.delayedPageWrite().finishReplacement();
+
delayedPageReplacementTracker.delayedPageWrite().flushCopiedPageIfExists();
+ }
return pageId;
}
@@ -800,7 +798,7 @@ public class PersistentPageMemory implements PageMemory {
} finally {
seg.writeLock().unlock();
-
delayedPageReplacementTracker.delayedPageWrite().finishReplacement();
+
delayedPageReplacementTracker.delayedPageWrite().flushCopiedPageIfExists();
if (readPageFromStore) {
assert lockedPageAbsPtr != -1 : "Page is expected to have a
valid address [pageId=" + fullId
@@ -1495,33 +1493,61 @@ public class PersistentPageMemory implements PageMemory
{
}
/**
- * Prepares a page removal for page replacement, if needed.
+ * Tries to replace the page.
*
- * @param fullPageId Candidate page full ID.
- * @param absPtr Absolute pointer of the page to evict.
- * @return {@code True} if it is ok to replace this page, {@code
false} if another page should be selected.
- * @throws IgniteInternalCheckedException If failed to write page to
the underlying store during eviction.
+ * <p>The replacement will be successful if the following conditions
are met:</p>
+ * <ul>
+ * <li>Page is pinned by another thread, such as a checkpoint
dirty page writer or in the process of being modified - nothing
+ * needs to be done.</li>
+ * <li>Page is not dirty - just remove it from the loaded
pages.</li>
+ * <li>Page is dirty, there is a checkpoint in the process and the
following sub-conditions are met:</li>
+ * <ul>
+ * <li>Page belongs to current checkpoint.</li>
+ * <li>If the dirty page sorting phase is complete, otherwise
we wait for it. This is necessary so that we can safely
+ * create partition delta files in which the dirty page order
must be preserved.</li>
+ * <li>If the checkpoint dirty page writer has not started
writing the page or has already written it.</li>
+ * </ul>
+ * </ul>
+ *
+ * <p>It is expected that if the method returns {@code true}, it will
not be invoked again for the same page ID.</p>
+ *
+ * <p>If we intend to replace a page, it is important for us to block
the delta file fsync phase of the checkpoint to preserve data
+ * consistency. The phase should not start until all dirty pages are
written by the checkpoint writer, but for page replacement we
+ * must block it ourselves.</p>
+ *
+ * @param fullPageId Candidate page ID.
+ * @param absPtr Absolute pointer to the candidate page.
+ * @return {@code True} if the page replacement was successful,
otherwise need to try another one.
+ * @throws IgniteInternalCheckedException If any error occurred while
waiting for the dirty page sorting phase to complete at a
+ * checkpoint.
*/
public boolean tryToRemovePage(FullPageId fullPageId, long absPtr)
throws IgniteInternalCheckedException {
assert writeLock().isHeldByCurrentThread();
if (isAcquired(absPtr)) {
+ // Page is pinned by another thread, such as a checkpoint
dirty page writer or in the process of being modified - nothing
+ // needs to be done.
return false;
}
if (isDirty(absPtr)) {
CheckpointPages checkpointPages = this.checkpointPages;
- // Can evict a dirty page only if should be written by a
checkpoint.
- // These pages does not have tmp buffer.
- if (checkpointPages != null &&
checkpointPages.allowToSave(fullPageId)) {
- WriteDirtyPage writeDirtyPage =
delayedPageReplacementTracker.delayedPageWrite();
-
- writeDirtyPage.write(PersistentPageMemory.this,
fullPageId, wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()));
+ // Can replace a dirty page only if it should be written by a
checkpoint.
+ // Safe to invoke because we keep segment write lock and the
checkpoint writer must remove pages on the segment read lock.
+ if (checkpointPages != null &&
checkpointPages.removeOnPageReplacement(fullPageId)) {
+ checkpointPages.blockFsyncOnPageReplacement(fullPageId);
+
+ DelayedDirtyPageWrite delayedDirtyPageWrite =
delayedPageReplacementTracker.delayedPageWrite();
+
+ delayedDirtyPageWrite.copyPageToTemporaryBuffer(
+ PersistentPageMemory.this,
+ fullPageId,
+ wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()),
+ checkpointPages
+ );
setDirty(fullPageId, absPtr, false, true);
- checkpointPages.markAsSaved(fullPageId);
-
loadedPages.remove(fullPageId.groupId(),
fullPageId.effectivePageId());
return true;
@@ -1826,23 +1852,23 @@ public class PersistentPageMemory implements PageMemory
{
/**
* Returns {@code true} if remove successfully.
*
- * @param fullPageId Page ID to clear.
+ * @param fullPageId Page ID to remove.
*/
- boolean clearCheckpoint(FullPageId fullPageId) {
+ private boolean removeOnCheckpoint(FullPageId fullPageId) {
Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
CheckpointPages pages0 = seg.checkpointPages;
- assert pages0 != null;
+ assert pages0 != null : fullPageId;
- return pages0.markAsSaved(fullPageId);
+ return pages0.removeOnCheckpoint(fullPageId);
}
/**
* Makes a full copy of the dirty page for checkpointing, then marks the
page as not dirty.
*
* @param absPtr Absolute page pointer.
- * @param fullId Full page id.
+ * @param fullId Full page ID.
* @param buf Buffer for copy page content for future write via {@link
PageStoreWriter}.
* @param pageSingleAcquire Page is acquired only once. We don't pin the
page second time (until page will not be copied) in case
* checkpoint temporary buffer is used.
@@ -1883,7 +1909,7 @@ public class PersistentPageMemory implements PageMemory {
return;
}
- if (!clearCheckpoint(fullId)) {
+ if (!removeOnCheckpoint(fullId)) {
rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
if (!pageSingleAcquire) {
@@ -1946,8 +1972,8 @@ public class PersistentPageMemory implements PageMemory {
/**
* Prepare page for write during checkpoint. {@link PageStoreWriter} will
be called when the page will be ready to write.
*
- * @param fullId Page ID to get byte buffer for. The page ID must be
present in the collection returned by the {@link
- * #beginCheckpoint(CompletableFuture)} method call.
+ * @param fullId Page ID to get byte buffer for. The page ID must be
present in the collection returned by the {@link #beginCheckpoint}
+ * method call.
* @param buf Temporary buffer to write changes into.
* @param pageStoreWriter Checkpoint page write context.
* @param tracker Checkpoint metrics tracker.
@@ -2067,11 +2093,11 @@ public class PersistentPageMemory implements PageMemory
{
* begun, the modifications will be written to a temporary buffer which
will be flushed to the main memory after the checkpointing
* finished. This method must be called when no concurrent operations on
pages are performed.
*
- * @param allowToReplace The sign which allows replacing pages from a
checkpoint by page replacer.
+ * @param checkpointProgress Progress of the current checkpoint.
* @return Collection view of dirty page IDs.
* @throws IgniteInternalException If checkpoint has been already started
and was not finished.
*/
- public Collection<FullPageId> beginCheckpoint(CompletableFuture<?>
allowToReplace) throws IgniteInternalException {
+ public Collection<FullPageId> beginCheckpoint(CheckpointProgress
checkpointProgress) throws IgniteInternalException {
if (segments == null) {
return List.of();
}
@@ -2081,11 +2107,15 @@ public class PersistentPageMemory implements PageMemory
{
for (int i = 0; i < segments.length; i++) {
Segment segment = segments[i];
- assert segment.checkpointPages == null : "Failed to begin
checkpoint (it is already in progress)";
+ assert segment.checkpointPages == null : String.format(
+ "Failed to begin checkpoint (it is already in progress):
[storageProfile=%s, segmentIdx=%s]",
+ storageProfileView.name(), i
+ );
- Set<FullPageId> segmentDirtyPages = (dirtyPageIds[i] =
segment.dirtyPages);
+ Set<FullPageId> segmentDirtyPages = segment.dirtyPages;
+ dirtyPageIds[i] = segmentDirtyPages;
- segment.checkpointPages = new CheckpointPages(segmentDirtyPages,
allowToReplace);
+ segment.checkpointPages = new CheckpointPages(segmentDirtyPages,
checkpointProgress);
segment.resetDirtyPages();
}
@@ -2123,4 +2153,10 @@ public class PersistentPageMemory implements PageMemory {
return checkpointPool.size() > checkpointBufLimit;
}
+
+ /** Returns {@code true} if a page replacement has occurred at least once.
*/
+ @TestOnly
+ public boolean pageReplacementOccurred() {
+ return pageReplacementWarned > 0;
+ }
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 9bb883597c..4d85b72087 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -309,7 +309,6 @@ public class CheckpointManager {
CheckpointProgress lastCheckpointProgress = lastCheckpointProgress();
assert lastCheckpointProgress != null : "Checkpoint has not happened
yet";
- // TODO https://issues.apache.org/jira/browse/IGNITE-23212 This
assertion fails sometimes.
assert lastCheckpointProgress.inProgress() : "Checkpoint must be in
progress";
CheckpointDirtyPages pagesToWrite =
lastCheckpointProgress.pagesToWrite();
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java
new file mode 100644
index 0000000000..782c3fe34f
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacement.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for synchronizing page replacement and the beginning of the
fsync phase at a checkpoint.
+ *
+ * <p>For data consistency, it is important for us that the fsync delta file
phase begins strictly after all page replacements are
+ * completed.</p>
+ *
+ * <p>Usage:</p>
+ * <ul>
+ * <li>{@link #block} - before you need to perform a page replacement.</li>
+ * <li>{@link #unblock} - after the page replacement has finished and
written to disk. The method must be invoked even if any error
+ * occurred, so as not to hang a checkpoint.</li>
+ * <li>{@link #stopBlocking} - must be invoked before the start of the
fsync phase on the checkpoint and wait for the future to
+ * complete in order to safely perform the phase.</li>
+ * </ul>
+ *
+ * <p>Thread safe.</p>
+ */
+class CheckpointPageReplacement {
+ /** IDs of pages for which page replacement is in progress. */
+ private final Set<FullPageId> pageIds = ConcurrentHashMap.newKeySet();
+
+ private final CompletableFuture<Void> stopBlockingFuture = new
CompletableFuture<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /**
+ * Block the start of the fsync phase at a checkpoint before replacing the
page.
+ *
+ * <p>It is expected that the method will be invoked once and after that
the {@link #unblock} will be invoked on the same page.</p>
+ *
+ * <p>It is expected that the method will not be invoked after {@link
#stopBlocking}, since by the start of the fsync phase, write
+ * dirty pages at the checkpoint should be complete and no new page
replacements should be started.</p>
+ *
+ * @param pageId Page ID for which page replacement will begin.
+ * @see #unblock(FullPageId, Throwable)
+ * @see #stopBlocking()
+ */
+ void block(FullPageId pageId) {
+ boolean enterBusy = busyLock.enterBusy();
+
+ assert enterBusy : "Method should not be invoked after the fsync phase
has started for any page: " + pageId;
+
+ try {
+ boolean added = pageIds.add(pageId);
+
+ assert added : "Page is already in the process of being replaced:
" + pageId;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Unblocks the start of the fsync phase at a checkpoint after the page
replacement is completed.
+ *
+ * <p>It is expected that the method will be invoked once and after the
{@link #block} for same page ID.</p>
+ *
+ * <p>The fsync phase will only be started after page replacement has been
completed for all pages for which
+ * {@link #block} was invoked before {@link #stopBlocking} was invoked, or
no page replacement occurred at all.</p>
+ *
+ * <p>If an error occurs on any page replacement during one checkpoint,
the future from {@link #stopBlocking} will complete with the
+ * first error.</p>
+ *
+ * <p>The method must be invoked even if any error occurred, so as not to
hang a checkpoint.</p>
+ *
+ * @param pageId Page ID for which the page replacement has ended.
+ * @param error Error on page replacement, {@code null} if missing.
+ * @see #block(FullPageId)
+ * @see #stopBlocking()
+ */
+ void unblock(FullPageId pageId, @Nullable Throwable error) {
+ boolean removed = pageIds.remove(pageId);
+
+ assert removed : "Replacement for the page either did not start or
ended: " + pageId;
+
+ if (error != null) {
+ stopBlockingFuture.completeExceptionally(error);
+
+ return;
+ }
+
+ if (!busyLock.enterBusy()) {
+ if (pageIds.isEmpty()) {
+ stopBlockingFuture.complete(null);
+ }
+ } else {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Stops new blocks before the fsync phase starts at a checkpoint.
+ *
+ * @return Future that will be completed successfully if all {@link
#block} are completed before the current method is invoked, either
+ * if there were none, or with an error from the first {@link
#unblock}.
+ * @see #block(FullPageId)
+ * @see #unblock(FullPageId, Throwable)
+ */
+ CompletableFuture<Void> stopBlocking() {
+ if (stopGuard.compareAndSet(false, true)) {
+ busyLock.block();
+ }
+
+ if (pageIds.isEmpty()) {
+ stopBlockingFuture.complete(null);
+ }
+
+ return stopBlockingFuture;
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
index dd7c139ba2..f6897a03b8 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -17,75 +17,184 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.jetbrains.annotations.Nullable;
/**
- * View of pages which should be stored during current checkpoint.
+ * Class contains dirty pages of the segment that will need to be written at a
checkpoint or page replacement. It also contains helper
+ * methods before writing pages.
+ *
+ * <p>For correct parallel operation of the checkpoint writer and page
replacement, external synchronization must be used.</p>
+ *
+ * @see PersistentPageMemory#checkpointWritePage(FullPageId, ByteBuffer,
PageStoreWriter, CheckpointMetricsTracker)
+ * @see PersistentPageMemory.Segment#tryToRemovePage(FullPageId, long)
*/
public class CheckpointPages {
- private final Set<FullPageId> segmentPages;
+ private final Set<FullPageId> pageIds;
- private final CompletableFuture<?> allowToReplace;
+ private final CheckpointProgressImpl checkpointProgress;
/**
* Constructor.
*
- * @param pages Pages which would be stored to disk in current checkpoint,
does not copy the set.
- * @param replaceFuture The sign which allows replacing pages from a
checkpoint by page replacer.
+ * @param pageIds Dirty page IDs in the segment that should be written at
a checkpoint or page replacement.
+ * @param checkpointProgress Progress of the current checkpoint at which
the object was created.
*/
- public CheckpointPages(Set<FullPageId> pages, CompletableFuture<?>
replaceFuture) {
- segmentPages = pages;
- allowToReplace = replaceFuture;
+ public CheckpointPages(Set<FullPageId> pageIds, CheckpointProgress
checkpointProgress) {
+ this.pageIds = pageIds;
+ this.checkpointProgress = (CheckpointProgressImpl) checkpointProgress;
}
/**
- * Returns {@code true} If fullPageId is allowable to store to disk.
+ * Removes a page ID that would be written at page replacement. Must be
invoked before writing a page to disk.
+ *
+ * <p>Page will be removed only if the dirty page sorting phase at the
checkpoint has completed or will synchronously wait for it to
+ * complete.</p>
*
- * @param fullPageId Page id for checking.
- * @throws IgniteInternalCheckedException If the waiting sign which allows
replacing pages from a checkpoint by page replacer fails.
+ * <p>To keep the data consistent, we need to use {@link
#blockFsyncOnPageReplacement} and {@link #unblockFsyncOnPageReplacement} after
+ * calling the current method to prevent the fsync checkpoint phase from
starting.</p>
+ *
+ * @param pageId Page ID to remove.
+ * @return {@code True} if the page was removed by the current method
invoke, {@code false} if the page was already removed by another
+ * removes or did not exist.
+ * @throws IgniteInternalCheckedException If any error occurred while
waiting for the dirty page sorting phase to complete at a
+ * checkpoint.
+ * @see #removeOnCheckpoint(FullPageId)
+ * @see #blockFsyncOnPageReplacement(FullPageId)
+ * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
*/
- public boolean allowToSave(FullPageId fullPageId) throws
IgniteInternalCheckedException {
+ public boolean removeOnPageReplacement(FullPageId pageId) throws
IgniteInternalCheckedException {
try {
// Uninterruptibly is important because otherwise in case of
interrupt of client thread node would be stopped.
- getUninterruptibly(allowToReplace);
+ getUninterruptibly(checkpointProgress.futureFor(PAGES_SORTED));
} catch (ExecutionException e) {
throw new IgniteInternalCheckedException(e.getCause());
} catch (CancellationException e) {
throw new IgniteInternalCheckedException(e);
}
- return segmentPages.contains(fullPageId);
+ return pageIds.remove(pageId);
}
/**
- * Returns {@code true} If fullPageId is candidate to stored to disk by
current checkpoint.
+ * Removes a page ID that would be written at checkpoint. Must be invoked
before writing a page to disk.
+ *
+ * <p>We don't need to block the fsync phase of the checkpoint, since it
won't start until all dirty pages have been written at the
+ * checkpoint, except those for which page replacement has occurred.</p>
*
- * @param fullPageId Page id for checking.
+ * @param pageId Page ID to remove.
+ * @return {@code True} if the page was removed by the current method
invoke, {@code false} if the page was already removed by another
+ * removes or did not exist.
+ * @see #removeOnPageReplacement(FullPageId)
*/
- public boolean contains(FullPageId fullPageId) {
- return segmentPages.contains(fullPageId);
+ public boolean removeOnCheckpoint(FullPageId pageId) {
+ return pageIds.remove(pageId);
}
/**
- * Returns {@code true} if it is marking was successful.
+ * Returns {@code true} if the page has not yet been written by a
checkpoint or page replacement.
*
- * @param fullPageId Page id which should be marked as saved to disk.
+ * @param pageId Page ID for checking.
*/
- public boolean markAsSaved(FullPageId fullPageId) {
- return segmentPages.remove(fullPageId);
+ public boolean contains(FullPageId pageId) {
+ return pageIds.contains(pageId);
+ }
+
+ /** Returns the current size of all pages that will be written at a
checkpoint or page replacement. */
+ public int size() {
+ return pageIds.size();
}
/**
- * Returns size of all pages in current checkpoint.
+ * Block the start of the fsync phase at a checkpoint before replacing the
page.
+ *
+ * <p>It is expected that the method will be invoked once and after that
the {@link #unblockFsyncOnPageReplacement} will be invoked on
+ * the same page.</p>
+ *
+ * @param pageId Page ID for which page replacement will begin.
+ * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
+ * @see #removeOnPageReplacement(FullPageId)
*/
- public int size() {
- return segmentPages.size();
+ public void blockFsyncOnPageReplacement(FullPageId pageId) {
+ checkpointProgress.blockFsyncOnPageReplacement(pageId);
+ }
+
+ /**
+ * Unblocks the start of the fsync phase at a checkpoint after the page
replacement is completed.
+ *
+ * <p>It is expected that the method will be invoked once and after the
{@link #blockFsyncOnPageReplacement} for same page ID.</p>
+ *
+ * <p>The fsync phase will only be started after page replacement has been
completed for all pages for which
+ * {@link #blockFsyncOnPageReplacement} was invoked or no page replacement
occurred at all.</p>
+ *
+ * <p>The method must be invoked even if any error occurred, so as not to
hang a checkpoint.</p>
+ *
+ * @param pageId Page ID for which the page replacement has ended.
+ * @param error Error on page replacement, {@code null} if missing.
+ * @see #blockFsyncOnPageReplacement(FullPageId)
+ * @see #removeOnPageReplacement(FullPageId)
+ */
+ public void unblockFsyncOnPageReplacement(FullPageId pageId, @Nullable
Throwable error) {
+ checkpointProgress.unblockFsyncOnPageReplacement(pageId, error);
+ }
+
+ /**
+ * Blocks physical destruction of partition.
+ *
+ * <p>When the intention to destroy partition appears, {@link
FilePageStore#isMarkedToDestroy()} is set to {@code == true} and
+ * {@link PersistentPageMemory#invalidate(int, int)} invoked at the
beginning. And if there is a block, it waits for unblocking.
+ * Then it destroys the partition, {@link
FilePageStoreManager#getStore(GroupPartitionId)} will return {@code null}.</p>
+ *
+ * <p>It is recommended to use where physical destruction of the partition
may have an impact, for example when writing dirty pages and
+ * executing a fsync.</p>
+ *
+ * <p>To make sure that we can physically do something with the partition
during a block, we will need to use approximately the
+ * following code:</p>
+ * <pre><code>
+ * checkpointProgress.blockPartitionDestruction(partitionId);
+ *
+ * try {
+ * FilePageStore pageStore =
FilePageStoreManager#getStore(partitionId);
+ *
+ * if (pageStore == null || pageStore.isMarkedToDestroy()) {
+ * return;
+ * }
+ *
+ * someAction(pageStore);
+ * } finally {
+ * checkpointProgress.unblockPartitionDestruction(partitionId);
+ * }
+ * </code></pre>
+ *
+ * @param groupPartitionId Pair of group ID with partition ID.
+ * @see #unblockPartitionDestruction(GroupPartitionId)
+ */
+ public void blockPartitionDestruction(GroupPartitionId groupPartitionId) {
+ checkpointProgress.blockPartitionDestruction(groupPartitionId);
+ }
+
+ /**
+ * Unblocks physical destruction of partition.
+ *
+ * <p>As soon as the last thread makes an unlock, the physical destruction
of the partition can immediately begin.</p>
+ *
+ * @param groupPartitionId Pair of group ID with partition ID.
+ * @see #blockPartitionDestruction(GroupPartitionId)
+ */
+ public void unblockPartitionDestruction(GroupPartitionId groupPartitionId)
{
+ checkpointProgress.unblockPartitionDestruction(groupPartitionId);
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index c258f5ba11..f8a5c5abc9 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -76,6 +77,9 @@ class CheckpointProgressImpl implements CheckpointProgress {
/** Partitions currently being processed, for example, writing dirty pages
or doing fsync. */
private final PartitionProcessingCounterMap processedPartitionMap = new
PartitionProcessingCounterMap();
+ /** Assistant for synchronizing page replacement and fsync phase. */
+ private final CheckpointPageReplacement checkpointPageReplacement = new
CheckpointPageReplacement();
+
/**
* Constructor.
*
@@ -337,4 +341,58 @@ class CheckpointProgressImpl implements CheckpointProgress
{
public @Nullable CompletableFuture<Void>
getUnblockPartitionDestructionFuture(GroupPartitionId groupPartitionId) {
return
processedPartitionMap.getProcessedPartitionFuture(groupPartitionId);
}
+
+ /**
+ * Block the start of the fsync phase at a checkpoint before replacing the
page.
+ *
+ * <p>It is expected that the method will be invoked once and after that
the {@link #unblockFsyncOnPageReplacement} will be invoked on
+ * the same page.</p>
+ *
+ * <p>It is expected that the method will not be invoked after {@link
#getUnblockFsyncOnPageReplacementFuture}, since by the start of
+ * the fsync phase, write dirty pages at the checkpoint should be complete
and no new page replacements should be started.</p>
+ *
+ * @param pageId Page ID for which page replacement is expected to begin.
+ * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
+ * @see #getUnblockFsyncOnPageReplacementFuture()
+ */
+ void blockFsyncOnPageReplacement(FullPageId pageId) {
+ checkpointPageReplacement.block(pageId);
+ }
+
+ /**
+ * Unblocks the start of the fsync phase at a checkpoint after the page
replacement is completed.
+ *
+ * <p>It is expected that the method will be invoked once and after the
{@link #blockFsyncOnPageReplacement} for same page ID.</p>
+ *
+ * <p>The fsync phase will only be started after page replacement has been
completed for all pages for which
+ * {@link #blockFsyncOnPageReplacement} was invoked before {@link
#getUnblockFsyncOnPageReplacementFuture} was invoked, or no page
+ * replacement occurred at all.</p>
+ *
+ * <p>If an error occurs on any page replacement during one checkpoint,
the future from {@link #getUnblockFsyncOnPageReplacementFuture}
+ * will complete with the first error.</p>
+ *
+ * <p>The method must be invoked even if any error occurred, so as not to
hang a checkpoint.</p>
+ *
+ * @param pageId Page ID for which the page replacement has ended.
+ * @param error Error on page replacement, {@code null} if missing.
+ * @see #blockFsyncOnPageReplacement(FullPageId)
+ * @see #getUnblockFsyncOnPageReplacementFuture()
+ */
+ void unblockFsyncOnPageReplacement(FullPageId pageId, @Nullable Throwable
error) {
+ checkpointPageReplacement.unblock(pageId, error);
+ }
+
+ /**
+ * Return future that will be completed successfully if all {@link
#blockFsyncOnPageReplacement} are completed, either if there were
+ * none, or with an error from the first {@link
#unblockFsyncOnPageReplacement}.
+ *
+ * <p>Must be invoked before the start of the fsync phase at the
checkpoint and wait for the future to complete in order to safely
+ * perform the phase.</p>
+ *
+ * @see #blockFsyncOnPageReplacement(FullPageId)
+ * @see #unblockFsyncOnPageReplacement(FullPageId, Throwable)
+ */
+ CompletableFuture<Void> getUnblockFsyncOnPageReplacementFuture() {
+ return checkpointPageReplacement.stopBlocking();
+ }
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index ce5a4c9dbc..2f8415a544 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -42,7 +42,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
@@ -258,7 +257,7 @@ class CheckpointWorkflow {
tracker.onMarkCheckpointBeginEnd();
// Page replacement is allowed only after sorting dirty pages.
- dirtyPages = beginCheckpoint(dataRegions,
curr.futureFor(PAGES_SORTED));
+ dirtyPages = beginCheckpoint(curr);
curr.currentCheckpointPagesCount(dirtyPages.dirtyPageCount);
@@ -370,10 +369,7 @@ class CheckpointWorkflow {
.collect(toUnmodifiableList());
}
- private DataRegionsDirtyPages beginCheckpoint(
- Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
- CompletableFuture<?> allowToReplace
- ) {
+ private DataRegionsDirtyPages beginCheckpoint(CheckpointProgressImpl
checkpointProgress) {
assert checkpointReadWriteLock.isWriteLockHeldByCurrentThread();
Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap =
this.dirtyPartitionsMap;
@@ -384,7 +380,7 @@ class CheckpointWorkflow {
// First, we iterate all regions that have dirty pages.
for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
- Collection<FullPageId> dirtyPages =
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
+ Collection<FullPageId> dirtyPages =
dataRegion.pageMemory().beginCheckpoint(checkpointProgress);
Set<FullPageId> dirtyMetaPageIds =
dirtyPartitionsMap.remove(dataRegion);
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index b65a6660c0..9385ebf3bd 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -437,7 +437,7 @@ public class Checkpointer extends IgniteWorker {
* @param shutdownNow Checker of stop operation.
* @throws IgniteInternalCheckedException If failed.
*/
- boolean writePages(
+ private boolean writePages(
CheckpointMetricsTracker tracker,
CheckpointDirtyPages checkpointDirtyPages,
CheckpointProgressImpl currentCheckpointProgress,
@@ -484,8 +484,21 @@ public class Checkpointer extends IgniteWorker {
// Wait and check for errors.
CompletableFuture.allOf(futures).join();
- // Must re-check shutdown flag here because threads may have skipped
some pages.
- // If so, we should not put finish checkpoint mark.
+ // Must re-check shutdown flag here because threads may have skipped
some pages because of it.
+ // If so, we should not finish checkpoint.
+ if (shutdownNow.getAsBoolean()) {
+ currentCheckpointProgress.fail(new NodeStoppingException("Node is
stopping."));
+
+ return false;
+ }
+
+ // Waiting for the completion of all page replacements if present.
+ // Will complete normally or with the first error on one of the page
replacements.
+ // join() is used intentionally as above.
+
currentCheckpointProgress.getUnblockFsyncOnPageReplacementFuture().join();
+
+ // Must re-check shutdown flag here because threads could take a long
time to complete the page replacement.
+ // If so, we should not finish checkpoint.
if (shutdownNow.getAsBoolean()) {
currentCheckpointProgress.fail(new NodeStoppingException("Node is
stopping."));
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
index 44af8c084d..84536da010 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
@@ -23,16 +23,26 @@ import static
org.apache.ignite.internal.util.GridUnsafe.copyMemory;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
import org.jetbrains.annotations.Nullable;
/**
- * Not thread safe and stateful class for page replacement of one page with
write() delay. This allows to write page content without holding
- * segment lock. Page data is copied into temp buffer during {@link
#write(PersistentPageMemory, FullPageId, ByteBuffer)} and then sent to
- * real implementation by {@link #finishReplacement}.
+ * Stateful class for page replacement of one page with write() delay. This
allows to write page content without holding
+ * segment write lock, which allows for a long time not to block the segment
read lock due to IO writes when reading pages or writing dirty
+ * pages at a checkpoint.
+ *
+ * <p>Usage:</p>
+ * <ul>
+ * <li>On page replacement, invoke {@link #copyPageToTemporaryBuffer}.</li>
+ * <li>After releasing the segment write lock, invoke {@link
#flushCopiedPageIfExists}.</li>
+ * </ul>
+ *
+ * <p>Not thread safe.</p>
*/
-public class DelayedDirtyPageWrite implements WriteDirtyPage {
+public class DelayedDirtyPageWrite {
/** Real flush dirty page implementation. */
private final WriteDirtyPage flushDirtyPage;
@@ -45,21 +55,26 @@ public class DelayedDirtyPageWrite implements
WriteDirtyPage {
/** Replacing pages tracker, used to register & unregister pages being
written. */
private final DelayedPageReplacementTracker tracker;
- /** Full page id to be written on {@link #finishReplacement} or {@code
null} if nothing to write. */
+ /** Full page id to be written on {@link #flushCopiedPageIfExists}, {@code
null} if nothing to write. */
private @Nullable FullPageId fullPageId;
- /** Page memory to be used in {@link #finishReplacement}. */
+ /** Page memory to be used in {@link #flushCopiedPageIfExists}, {@code
null} if nothing to write. */
private @Nullable PersistentPageMemory pageMemory;
+ /**
+ * Dirty pages of the segment that need to be written at the current
checkpoint or page replacement, {@code null} if nothing to write.
+ */
+ private @Nullable CheckpointPages checkpointPages;
+
/**
* Constructor.
*
- * @param flushDirtyPage real writer to save page to store.
- * @param byteBufThreadLoc thread local buffers to use for pages copying.
- * @param pageSize page size.
- * @param tracker tracker to lock/unlock page reads.
+ * @param flushDirtyPage Real writer to IO write page to store.
+ * @param byteBufThreadLoc Thread local buffers to use for pages copying.
+ * @param pageSize Page size in bytes.
+ * @param tracker Tracker to lock/unlock page reads.
*/
- public DelayedDirtyPageWrite(
+ DelayedDirtyPageWrite(
WriteDirtyPage flushDirtyPage,
ThreadLocal<ByteBuffer> byteBufThreadLoc,
// TODO: IGNITE-17017 Move to common config
@@ -72,41 +87,71 @@ public class DelayedDirtyPageWrite implements
WriteDirtyPage {
this.tracker = tracker;
}
- /** {@inheritDoc} */
- @Override
- public void write(PersistentPageMemory pageMemory, FullPageId fullPageId,
ByteBuffer byteBuf) {
- tracker.lock(fullPageId);
+ /**
+ * Copies a page to a temporary buffer on page replacement.
+ *
+ * @param pageMemory Persistent page memory for subsequent page IO writes.
+ * @param pageId ID of the copied page.
+ * @param originPageBuf Buffer with the full contents of the page being
copied (from which we will copy).
+ * @param checkpointPages Dirty pages of the segment that need to be
written at the current checkpoint or page replacement.
+ * @see #flushCopiedPageIfExists()
+ */
+ public void copyPageToTemporaryBuffer(
+ PersistentPageMemory pageMemory,
+ FullPageId pageId,
+ ByteBuffer originPageBuf,
+ CheckpointPages checkpointPages
+ ) {
+ tracker.lock(pageId);
ByteBuffer threadLocalBuf = byteBufThreadLoc.get();
threadLocalBuf.rewind();
- long writeAddr = bufferAddress(threadLocalBuf);
- long origBufAddr = bufferAddress(byteBuf);
+ long dstBufAddr = bufferAddress(threadLocalBuf);
+ long srcBufAddr = bufferAddress(originPageBuf);
- copyMemory(origBufAddr, writeAddr, pageSize);
+ copyMemory(srcBufAddr, dstBufAddr, pageSize);
- this.fullPageId = fullPageId;
+ this.fullPageId = pageId;
this.pageMemory = pageMemory;
+ this.checkpointPages = checkpointPages;
}
/**
- * Runs actual write if required. Method is 'no op' if there was no page
selected for replacement.
+ * Flushes a previously copied page to disk if it was copied.
*
- * @throws IgniteInternalCheckedException if write failed.
+ * @throws IgniteInternalCheckedException If write failed.
+ * @see #copyPageToTemporaryBuffer(PersistentPageMemory, FullPageId,
ByteBuffer, CheckpointPages)
*/
- public void finishReplacement() throws IgniteInternalCheckedException {
- if (fullPageId == null && pageMemory == null) {
+ public void flushCopiedPageIfExists() throws
IgniteInternalCheckedException {
+ if (fullPageId == null) {
return;
}
+ assert pageMemory != null : fullPageId;
+ assert checkpointPages != null : fullPageId;
+
+ Throwable errorOnWrite = null;
+
+
checkpointPages.blockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+
try {
flushDirtyPage.write(pageMemory, fullPageId,
byteBufThreadLoc.get());
+ } catch (Throwable t) {
+ errorOnWrite = t;
+
+ throw t;
} finally {
+
checkpointPages.unblockPartitionDestruction(GroupPartitionId.convert(fullPageId));
+
+ checkpointPages.unblockFsyncOnPageReplacement(fullPageId,
errorOnWrite);
+
tracker.unlock(fullPageId);
fullPageId = null;
pageMemory = null;
+ checkpointPages = null;
}
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
index 1ed08d89ba..a2f603cf8f 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
@@ -163,7 +163,7 @@ public class DelayedPageReplacementTracker {
boolean add = locked.add(id);
- assert add : "Double locking of page for replacement is not
possible";
+ assert add : "Double locking of page for replacement is not
possible: " + id;
}
}
@@ -210,7 +210,7 @@ public class DelayedPageReplacementTracker {
synchronized (locked) {
boolean rmv = locked.remove(id);
- assert rmv : "Unlocking page ID never locked, id " + id;
+ assert rmv : "Unlocking page ID never locked, id: " + id;
if (locked.isEmpty()) {
hasLockedPages = false;
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacementTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacementTest.java
new file mode 100644
index 0000000000..9c778bc0cb
--- /dev/null
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPageReplacementTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.fullPageId;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.Test;
+
+/** For {@link CheckpointPageReplacement} testing. */
+public class CheckpointPageReplacementTest {
+ @Test
+ void testBlock() {
+ var checkpointPageReplacement = new CheckpointPageReplacement();
+
+ assertDoesNotThrow(() -> checkpointPageReplacement.block(fullPageId(0,
0)));
+ assertDoesNotThrow(() -> checkpointPageReplacement.block(fullPageId(0,
1)));
+
+ checkpointPageReplacement.stopBlocking();
+ }
+
+ @Test
+ void testUnblock() {
+ var checkpointPageReplacement = new CheckpointPageReplacement();
+
+ checkpointPageReplacement.block(fullPageId(0, 0));
+ checkpointPageReplacement.block(fullPageId(0, 1));
+ checkpointPageReplacement.block(fullPageId(0, 2));
+ checkpointPageReplacement.block(fullPageId(0, 3));
+ checkpointPageReplacement.block(fullPageId(0, 4));
+ checkpointPageReplacement.block(fullPageId(0, 5));
+
+ assertDoesNotThrow(() ->
checkpointPageReplacement.unblock(fullPageId(0, 0), null));
+ assertDoesNotThrow(() ->
checkpointPageReplacement.unblock(fullPageId(0, 1), new Throwable("from test
0")));
+ assertDoesNotThrow(() ->
checkpointPageReplacement.unblock(fullPageId(0, 2), new Throwable("from test
1")));
+
+ checkpointPageReplacement.stopBlocking();
+
+ assertDoesNotThrow(() ->
checkpointPageReplacement.unblock(fullPageId(0, 3), null));
+ assertDoesNotThrow(() ->
checkpointPageReplacement.unblock(fullPageId(0, 4), new Throwable("from test
0")));
+ assertDoesNotThrow(() ->
checkpointPageReplacement.unblock(fullPageId(0, 5), new Throwable("from test
1")));
+ }
+
+ @Test
+ void testStopBlocking() {
+ var checkpointPageReplacement = new CheckpointPageReplacement();
+
+ checkpointPageReplacement.block(fullPageId(0, 0));
+ checkpointPageReplacement.block(fullPageId(0, 1));
+
+ CompletableFuture<Void> stopBlockingFuture =
checkpointPageReplacement.stopBlocking();
+ assertFalse(stopBlockingFuture.isDone());
+
+ checkpointPageReplacement.unblock(fullPageId(0, 0), null);
+ assertFalse(stopBlockingFuture.isDone());
+
+ checkpointPageReplacement.unblock(fullPageId(0, 1), null);
+ assertTrue(stopBlockingFuture.isDone());
+
+ assertTrue(checkpointPageReplacement.stopBlocking().isDone());
+ }
+
+ @Test
+ void testStopBlockingError() {
+ var checkpointPageReplacement = new CheckpointPageReplacement();
+
+ checkpointPageReplacement.block(fullPageId(0, 0));
+ checkpointPageReplacement.block(fullPageId(0, 1));
+
+ CompletableFuture<Void> stopBlockingFuture =
checkpointPageReplacement.stopBlocking();
+ assertFalse(stopBlockingFuture.isDone());
+
+ checkpointPageReplacement.unblock(fullPageId(0, 0), new
RuntimeException("from test 1"));
+ assertThat(stopBlockingFuture, willThrow(RuntimeException.class, "from
test 1"));
+
+ checkpointPageReplacement.unblock(fullPageId(0, 1), new
TimeoutException("from test 2"));
+ assertThat(stopBlockingFuture, willThrow(RuntimeException.class, "from
test 1"));
+
+ assertThat(checkpointPageReplacement.stopBlocking(),
willThrow(RuntimeException.class, "from test 1"));
+ }
+
+ @Test
+ void testStopBlockingNoPageReplacement() {
+ assertTrue(new CheckpointPageReplacement().stopBlocking().isDone());
+ }
+}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
index 0a41d30b81..38b742d58f 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
@@ -17,34 +17,29 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.fullPageId;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Collections;
import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.junit.jupiter.api.Test;
-/**
- * For {@link CheckpointPages} testing.
- */
+/** For {@link CheckpointPages} testing. */
public class CheckpointPagesTest {
@Test
void testContains() {
- CheckpointPages checkpointPages = new CheckpointPages(
- Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
- nullCompletedFuture()
- );
+ CheckpointPages checkpointPages = createCheckpointPages(new
FullPageId(0, 0), new FullPageId(1, 0));
assertTrue(checkpointPages.contains(new FullPageId(0, 0)));
assertTrue(checkpointPages.contains(new FullPageId(1, 0)));
@@ -55,65 +50,84 @@ public class CheckpointPagesTest {
@Test
void testSize() {
- CheckpointPages checkpointPages = new CheckpointPages(
- Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
- nullCompletedFuture()
- );
+ CheckpointPages checkpointPages = createCheckpointPages(fullPageId(0,
0), fullPageId(1, 0));
assertEquals(2, checkpointPages.size());
}
@Test
- void testMarkAsSaved() {
- CheckpointPages checkpointPages = new CheckpointPages(
- new HashSet<>(Set.of(new FullPageId(0, 0), new FullPageId(1,
0), new FullPageId(2, 0))),
- nullCompletedFuture()
- );
+ void testRemoveOnCheckpoint() {
+ CheckpointPages checkpointPages = createCheckpointPages(fullPageId(0,
0), fullPageId(1, 0), fullPageId(2, 0));
- assertTrue(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+ assertTrue(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
assertEquals(2, checkpointPages.size());
- assertFalse(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+ assertFalse(checkpointPages.removeOnCheckpoint(fullPageId(0, 0)));
assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
assertEquals(2, checkpointPages.size());
- assertTrue(checkpointPages.markAsSaved(new FullPageId(1, 0)));
+ assertTrue(checkpointPages.removeOnCheckpoint(fullPageId(1, 0)));
assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
assertEquals(1, checkpointPages.size());
}
@Test
- void testAllowToSave() throws Exception {
- Set<FullPageId> pages = Set.of(new FullPageId(0, 0), new FullPageId(1,
0), new FullPageId(2, 0));
-
- CheckpointPages checkpointPages = new CheckpointPages(pages,
nullCompletedFuture());
+ void testRemoveOnPageReplacement() throws Exception {
+ var checkpointProgress = new CheckpointProgressImpl(10);
- assertTrue(checkpointPages.allowToSave(new FullPageId(0, 0)));
- assertTrue(checkpointPages.allowToSave(new FullPageId(1, 0)));
- assertTrue(checkpointPages.allowToSave(new FullPageId(2, 0)));
+ CheckpointPages checkpointPages =
createCheckpointPages(checkpointProgress, fullPageId(0, 0), fullPageId(1, 0));
- assertFalse(checkpointPages.allowToSave(new FullPageId(3, 0)));
+ // Let's make sure that the check will not complete until the dirty
page sorting phase completes.
+ checkpointProgress.transitTo(LOCK_RELEASED);
- IgniteInternalCheckedException exception = assertThrows(
- IgniteInternalCheckedException.class,
- () -> new CheckpointPages(pages, failedFuture(new
Exception("test"))).allowToSave(new FullPageId(0, 0))
+ CompletableFuture<Boolean> removeOnPageReplacementFuture = runAsync(
+ () -> checkpointPages.removeOnPageReplacement(fullPageId(0, 0))
);
+ assertThat(removeOnPageReplacementFuture, willTimeoutFast());
- assertThat(exception.getCause(), instanceOf(Exception.class));
- assertThat(exception.getCause().getMessage(), equalTo("test"));
+ checkpointProgress.transitTo(PAGES_SORTED);
+ assertThat(removeOnPageReplacementFuture, willBe(true));
+ assertFalse(checkpointPages.contains(fullPageId(0, 0)));
+ assertEquals(1, checkpointPages.size());
- exception = assertThrows(
- IgniteInternalCheckedException.class,
- () -> {
- CompletableFuture<Object> future = new
CompletableFuture<>();
+ assertFalse(checkpointPages.removeOnPageReplacement(fullPageId(0, 0)));
+ assertFalse(checkpointPages.contains(fullPageId(0, 0)));
+ assertEquals(1, checkpointPages.size());
- future.cancel(true);
+ assertTrue(checkpointPages.removeOnPageReplacement(fullPageId(1, 0)));
+ assertFalse(checkpointPages.contains(fullPageId(1, 0)));
+ assertEquals(0, checkpointPages.size());
+ }
+
+ @Test
+ void testRemoveOnPageReplacementErrorOnWaitPageSortingPhase() {
+ var checkpointProgress = new CheckpointProgressImpl(10);
- new CheckpointPages(pages, future).allowToSave(new
FullPageId(0, 0));
- }
+ CheckpointPages checkpointPages =
createCheckpointPages(checkpointProgress);
+
+ checkpointProgress.fail(new Exception("from test"));
+
+ assertThrows(
+ Exception.class,
+ () -> checkpointPages.removeOnPageReplacement(fullPageId(0,
0)),
+ "from test"
);
+ }
+
+ private static CheckpointPages createCheckpointPages(FullPageId...
pageIds) {
+ var checkpointProgress = new CheckpointProgressImpl(10);
+
+ checkpointProgress.transitTo(PAGES_SORTED);
+
+ return createCheckpointPages(checkpointProgress, pageIds);
+ }
+
+ private static CheckpointPages
createCheckpointPages(CheckpointProgressImpl checkpointProgress, FullPageId...
pageIds) {
+ var set = new HashSet<FullPageId>(pageIds.length);
+
+ Collections.addAll(set, pageIds);
- assertThat(exception.getCause(),
instanceOf(CancellationException.class));
+ return new CheckpointPages(set, checkpointProgress);
}
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 9e45b41e4b..2c6c00ccc0 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -665,7 +665,7 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
private static PersistentPageMemory newPageMemory(Collection<FullPageId>
pageIds) {
PersistentPageMemory mock = mock(PersistentPageMemory.class);
-
when(mock.beginCheckpoint(any(CompletableFuture.class))).thenReturn(pageIds);
+
when(mock.beginCheckpoint(any(CheckpointProgress.class))).thenReturn(pageIds);
return mock;
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
index 217e19b01f..0d4764f9d0 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import java.util.Arrays;
import org.apache.ignite.internal.pagememory.FullPageId;
@@ -37,4 +39,14 @@ class TestCheckpointUtils {
Arrays.stream(dirtyPages).map(GroupPartitionId::convert).collect(toSet())
);
}
+
+ /**
+ * Creates new full page ID.
+ *
+ * @param groupId Group ID.
+ * @param partitionId Partition ID.
+ */
+ static FullPageId fullPageId(int groupId, int partitionId) {
+ return new FullPageId(pageId(partitionId, FLAG_DATA, 0), groupId);
+ }
}
diff --git
a/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
index 1ed535d7cd..af9a9a9607 100644
---
a/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
+++
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.pagememory;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
import com.google.auto.service.AutoService;
import java.util.Collection;
@@ -26,6 +28,7 @@ import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.io.PageIoModule;
+import
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FakePartitionMetaIo;
/**
* Test implementation of {@link PageIoModule}.
@@ -35,13 +38,18 @@ public class TestPageIoModule implements PageIoModule {
/** Last possible value for IO type. */
public static final int TEST_PAGE_TYPE = PageIo.MAX_IO_TYPE;
+ /** {@link FakePartitionMetaIo} type. */
+ public static final int FAKE_PARTITION_META_PAGE_IO_TYPE = TEST_PAGE_TYPE
- 1;
+
+ /** {@link TestSimpleValuePageIo} type.*/
+ public static final int TEST_SIMPLE_VALUE_PAGE_IO_TYPE =
FAKE_PARTITION_META_PAGE_IO_TYPE - 1;
+
/** Version 1, minimal possible value. */
public static final int TEST_PAGE_VER = 1;
- /** {@inheritDoc} */
@Override
public Collection<IoVersions<?>> ioVersions() {
- return List.of(new IoVersions<>(new TestPageIo()));
+ return List.of(new IoVersions<>(new TestPageIo()),
FakePartitionMetaIo.VERSIONS, new IoVersions<>(new TestSimpleValuePageIo()));
}
/**
@@ -55,9 +63,39 @@ public class TestPageIoModule implements PageIoModule {
super(TEST_PAGE_TYPE, TEST_PAGE_VER, FLAG_DATA);
}
- /** {@inheritDoc} */
@Override
protected void printPage(long addr, int pageSize, IgniteStringBuilder
sb) {
}
}
+
+ /** Test implementation of {@link PageIo} with simple value (long). */
+ public static class TestSimpleValuePageIo extends PageIo {
+ private static final int LONG_VALUE_OFF = PageIo.COMMON_HEADER_END;
+
+ /** Constructor. */
+ public TestSimpleValuePageIo() {
+ super(TEST_SIMPLE_VALUE_PAGE_IO_TYPE, TEST_PAGE_VER, FLAG_DATA);
+ }
+
+ @Override
+ public void initNewPage(long pageAddr, long pageId, int pageSize) {
+ super.initNewPage(pageAddr, pageId, pageSize);
+
+ setLongValue(pageAddr, 0);
+ }
+
+ @Override
+ protected void printPage(long addr, int pageSize, IgniteStringBuilder
sb) {
+ }
+
+ /** Reads long value. */
+ public static long getLongValue(long pageAddr) {
+ return getLong(pageAddr, LONG_VALUE_OFF);
+ }
+
+ /** Writes long value. */
+ public static void setLongValue(long pageAddr, long value) {
+ putLong(pageAddr, LONG_VALUE_OFF, value);
+ }
+ }
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
similarity index 69%
rename from
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
rename to
modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
index a4f8b736c2..0a9f695d7a 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
+++
b/modules/page-memory/src/testFixtures/java/org/apache/ignite/internal/pagememory/persistence/FakePartitionMeta.java
@@ -17,24 +17,27 @@
package org.apache.ignite.internal.pagememory.persistence;
+import static
org.apache.ignite.internal.pagememory.TestPageIoModule.FAKE_PARTITION_META_PAGE_IO_TYPE;
+import static
org.apache.ignite.internal.pagememory.TestPageIoModule.TEST_PAGE_VER;
+
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.persistence.io.PartitionMetaIo;
import org.jetbrains.annotations.Nullable;
-/**
- * Simple implementation of {@link PartitionMeta} for testing purposes.
- */
+/** Simple implementation of {@link PartitionMeta} for testing purposes. */
public class FakePartitionMeta extends PartitionMeta {
+ public static final FakePartitionMetaFactory FACTORY = new
FakePartitionMetaFactory();
- public static final PartitionMetaFactory FACTORY = new
FakePartitionMetaFactory();
-
- /**
- * Constructor.
- */
+ /** Constructor. */
public FakePartitionMeta() {
- super(0);
+ this(0);
+ }
+
+ /** Constructor. */
+ public FakePartitionMeta(int pageCount) {
+ super(pageCount);
}
/**
@@ -51,7 +54,7 @@ public class FakePartitionMeta extends PartitionMeta {
@Override
protected FakePartitionMetaSnapshot buildSnapshot(@Nullable UUID
checkpointId) {
- return new FakePartitionMetaSnapshot(checkpointId);
+ return new FakePartitionMetaSnapshot(checkpointId, pageCount());
}
/**
@@ -60,12 +63,17 @@ public class FakePartitionMeta extends PartitionMeta {
public static class FakePartitionMetaSnapshot implements
PartitionMetaSnapshot {
private final UUID checkpointId;
- public FakePartitionMetaSnapshot(@Nullable UUID checkpointId) {
+ private final int pageCount;
+
+ FakePartitionMetaSnapshot(@Nullable UUID checkpointId, int pageCount) {
this.checkpointId = checkpointId;
+ this.pageCount = pageCount;
}
@Override
public void writeTo(PartitionMetaIo metaIo, long pageAddr) {
+ FakePartitionMetaIo fakePartitionMetaIo = (FakePartitionMetaIo)
metaIo;
+ fakePartitionMetaIo.setPageCount(pageAddr, pageCount);
}
@Override
@@ -79,14 +87,16 @@ public class FakePartitionMeta extends PartitionMeta {
*/
public static class FakePartitionMetaIo extends PartitionMetaIo {
/** I/O versions. */
- public static final IoVersions<FakePartitionMetaIo> VERSIONS = new
IoVersions<>(new FakePartitionMetaIo(1, 1));
+ public static final IoVersions<FakePartitionMetaIo> VERSIONS = new
IoVersions<>(
+ new FakePartitionMetaIo(FAKE_PARTITION_META_PAGE_IO_TYPE,
TEST_PAGE_VER)
+ );
/**
* Constructor.
*
* @param ver Page format version.
*/
- protected FakePartitionMetaIo(int type, int ver) {
+ FakePartitionMetaIo(int type, int ver) {
super(type, ver);
}
@@ -98,13 +108,11 @@ public class FakePartitionMeta extends PartitionMeta {
}
}
- /**
- * Simple implementation of {@link PartitionMetaFactory} for testing
purposes.
- */
+ /** Simple implementation of {@link PartitionMetaFactory} for testing
purposes. */
public static class FakePartitionMetaFactory implements
PartitionMetaFactory {
@Override
- public FakePartitionMeta createPartitionMeta(UUID checkpointId,
PartitionMetaIo metaIo, long pageAddr) {
- return new FakePartitionMeta().init(checkpointId);
+ public FakePartitionMeta createPartitionMeta(@Nullable UUID
checkpointId, PartitionMetaIo metaIo, long pageAddr) {
+ return new
FakePartitionMeta(metaIo.getPageCount(pageAddr)).init(checkpointId);
}
@Override