This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c8f6929690 IGNITE-23115 Checkpoint single partition from a single
thread (#4379)
c8f6929690 is described below
commit c8f692969004f38997fec60bff0f601e4cad948b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Sep 13 16:54:36 2024 +0300
IGNITE-23115 Checkpoint single partition from a single thread (#4379)
---
.../pagememory/persistence/GroupPartitionId.java | 31 +---
.../checkpoint/CheckpointDirtyPages.java | 100 +++-------
.../persistence/checkpoint/CheckpointManager.java | 13 +-
.../checkpoint/CheckpointPagesWriter.java | 205 ++++++++++++---------
.../checkpoint/CheckpointPagesWriterFactory.java | 7 +-
.../persistence/checkpoint/CheckpointWorkflow.java | 21 ++-
.../persistence/checkpoint/Checkpointer.java | 6 +-
.../checkpoint/DirtyPagesAndPartitions.java} | 32 ++--
.../checkpoint/CheckpointDirtyPagesTest.java | 127 +++++--------
.../checkpoint/CheckpointManagerTest.java | 17 +-
.../checkpoint/CheckpointPagesWriterTest.java | 57 ++++--
.../persistence/checkpoint/CheckpointTest.java | 7 +-
.../checkpoint/CheckpointWorkflowTest.java | 46 ++---
.../persistence/checkpoint/CheckpointerTest.java | 13 +-
...heckpointTest.java => TestCheckpointUtils.java} | 34 ++--
15 files changed, 332 insertions(+), 384 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
index 73c97a3671..5c30c3c267 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/GroupPartitionId.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.pagememory.persistence;
+import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.tostring.S;
/**
@@ -54,13 +55,11 @@ public class GroupPartitionId implements
Comparable<GroupPartitionId> {
return partId;
}
- /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(GroupPartitionId.class, this);
}
- /** {@inheritDoc} */
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -73,14 +72,10 @@ public class GroupPartitionId implements
Comparable<GroupPartitionId> {
GroupPartitionId key = (GroupPartitionId) o;
- if (grpId != key.grpId) {
- return false;
- }
+ return grpId == key.grpId && partId == key.partId;
- return partId == key.partId;
}
- /** {@inheritDoc} */
@Override
public int hashCode() {
int result = grpId;
@@ -90,25 +85,19 @@ public class GroupPartitionId implements
Comparable<GroupPartitionId> {
return result;
}
- /** {@inheritDoc} */
@Override
public int compareTo(GroupPartitionId o) {
- if (getGroupId() < o.getGroupId()) {
- return -1;
- }
+ int cmp = Integer.compare(getGroupId(), o.getGroupId());
- if (getGroupId() > o.getGroupId()) {
- return 1;
+ if (cmp != 0) {
+ return cmp;
}
- if (getPartitionId() < o.getPartitionId()) {
- return -1;
- }
-
- if (getPartitionId() > o.getPartitionId()) {
- return 1;
- }
+ return Integer.compare(getPartitionId(), o.getPartitionId());
+ }
- return 0;
+ /** Converts given full page ID to a {@link GroupPartitionId}. */
+ public static GroupPartitionId convert(FullPageId fullPageId) {
+ return new GroupPartitionId(fullPageId.groupId(),
fullPageId.partitionId());
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
index b87a4188e4..86086ffa8f 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
@@ -20,21 +20,18 @@ package
org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static java.util.Arrays.binarySearch;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
import java.util.Comparator;
import java.util.List;
import java.util.RandomAccess;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
import org.jetbrains.annotations.Nullable;
-/**
- * Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs that should be
- * checkpointed.
- */
+/** Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and partition IDs that should be checkpointed. */
class CheckpointDirtyPages {
/** Dirty page ID comparator by groupId -> partitionId -> pageIdx. */
static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
@@ -44,8 +41,8 @@ class CheckpointDirtyPages {
/** Empty checkpoint dirty pages. */
static final CheckpointDirtyPages EMPTY = new
CheckpointDirtyPages(List.of());
- /** Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs. */
- private final List<DataRegionDirtyPages<FullPageId[]>> dirtyPages;
+ /** Dirty pages and partitions of data regions, with sorted dirty page IDs
by {@link #DIRTY_PAGE_COMPARATOR}. */
+ private final List<DirtyPagesAndPartitions> dirtyPagesAndPartitions;
/** Total number of dirty page IDs. */
private final int dirtyPagesCount;
@@ -53,14 +50,15 @@ class CheckpointDirtyPages {
/**
* Constructor.
*
- * @param dirtyPages Dirty pages of data regions, with sorted page IDs by
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
+ * @param dirtyPagesAndPartitions Dirty pages and partitions of data
regions, with sorted dirty page IDs by
+ * {@link #DIRTY_PAGE_COMPARATOR}. Expected list with {@link
RandomAccess}.
*/
- public CheckpointDirtyPages(List<DataRegionDirtyPages<FullPageId[]>>
dirtyPages) {
- assert dirtyPages instanceof RandomAccess : dirtyPages;
+ CheckpointDirtyPages(List<DirtyPagesAndPartitions>
dirtyPagesAndPartitions) {
+ assert dirtyPagesAndPartitions instanceof RandomAccess :
dirtyPagesAndPartitions;
- this.dirtyPages = dirtyPages;
+ this.dirtyPagesAndPartitions = dirtyPagesAndPartitions;
- dirtyPagesCount = dirtyPages.stream().mapToInt(pages ->
pages.dirtyPages.length).sum();
+ dirtyPagesCount = dirtyPagesAndPartitions.stream().mapToInt(pages ->
pages.dirtyPages.length).sum();
}
/**
@@ -70,15 +68,16 @@ class CheckpointDirtyPages {
return dirtyPagesCount;
}
- /**
- * Returns a queue of dirty page IDs to be written to a checkpoint.
- */
- public IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
toDirtyPageIdQueue() {
- List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>> dirtyPageIds =
dirtyPages.stream()
- .map(pages -> new IgniteBiTuple<>(pages.pageMemory,
pages.dirtyPages))
+ /** Creates a concurrent queue of dirty partitions to be written to at
checkpoint. */
+ public IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> toDirtyPartitionQueue() {
+ List<IgniteBiTuple<PersistentPageMemory, GroupPartitionId[]>>
dirtyPartitions = dirtyPagesAndPartitions.stream()
+ .map(dirtyPagesAndPartitions -> new IgniteBiTuple<>(
+ dirtyPagesAndPartitions.pageMemory,
+
dirtyPagesAndPartitions.dirtyPartitions.toArray(GroupPartitionId[]::new))
+ )
.collect(toList());
- return new IgniteConcurrentMultiPairQueue<>(dirtyPageIds);
+ return new IgniteConcurrentMultiPairQueue<>(dirtyPartitions);
}
/**
@@ -89,8 +88,8 @@ class CheckpointDirtyPages {
* @param partId Partition ID.
*/
public @Nullable CheckpointDirtyPagesView
getPartitionView(PersistentPageMemory pageMemory, int grpId, int partId) {
- for (int i = 0; i < dirtyPages.size(); i++) {
- if (dirtyPages.get(i).pageMemory == pageMemory) {
+ for (int i = 0; i < dirtyPagesAndPartitions.size(); i++) {
+ if (dirtyPagesAndPartitions.get(i).pageMemory == pageMemory) {
return getPartitionView(i, grpId, partId);
}
}
@@ -102,7 +101,7 @@ class CheckpointDirtyPages {
FullPageId startPageId = new FullPageId(pageId(partId, (byte) 0, 0),
grpId);
FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0),
grpId);
- FullPageId[] pageIds = dirtyPages.get(dirtyPagesIdx).dirtyPages;
+ FullPageId[] pageIds =
dirtyPagesAndPartitions.get(dirtyPagesIdx).dirtyPages;
int fromIndex = binarySearch(pageIds, startPageId,
DIRTY_PAGE_COMPARATOR);
@@ -119,51 +118,12 @@ class CheckpointDirtyPages {
return new CheckpointDirtyPagesView(dirtyPagesIdx, fromIndex, toIndex);
}
- /**
- * Looks for the next dirty page IDs view from the current one, {@code
null} if not found.
- *
- * @param currentView Current view to dirty pages, {@code null} to get
first.
- */
- public @Nullable CheckpointDirtyPagesView nextPartitionView(@Nullable
CheckpointDirtyPagesView currentView) {
- assert currentView == null || currentView.owner() == this :
currentView;
-
- if (dirtyPages.isEmpty()) {
- return null;
- }
-
- int regionIndex;
- int fromPosition;
-
- if (currentView == null) {
- regionIndex = 0;
- fromPosition = 0;
- } else {
- regionIndex = currentView.needsNextRegion() ?
currentView.regionIndex + 1 : currentView.regionIndex;
- fromPosition = currentView.needsNextRegion() ? 0 :
currentView.toPosition;
- }
-
- if (regionIndex >= dirtyPages.size()) {
- return null;
- }
-
- FullPageId[] pageIds = dirtyPages.get(regionIndex).dirtyPages;
-
- FullPageId startPageId = pageIds[fromPosition];
- FullPageId endPageId = new
FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0),
startPageId.groupId());
-
- int toPosition = binarySearch(pageIds, fromPosition, pageIds.length,
endPageId, DIRTY_PAGE_COMPARATOR);
-
- toPosition = toPosition > 0 ? toPosition : -toPosition - 1;
-
- return new CheckpointDirtyPagesView(regionIndex, fromPosition,
toPosition);
- }
-
/**
* Returns a full list of {@link PersistentPageMemory} instances, for
which there exist at least a single dirty partition in current
* checkpoint.
*/
List<PersistentPageMemory> dirtyPageMemoryInstances() {
- return this.dirtyPages.stream().map(p ->
p.pageMemory).collect(toList());
+ return this.dirtyPagesAndPartitions.stream().map(p ->
p.pageMemory).collect(toList());
}
/**
@@ -173,7 +133,7 @@ class CheckpointDirtyPages {
* <p>Thread safe.
*/
class CheckpointDirtyPagesView {
- /** Element index in {@link CheckpointDirtyPages#dirtyPages}. */
+ /** Element index in {@link
CheckpointDirtyPages#dirtyPagesAndPartitions}. */
private final int regionIndex;
/** Starting position (inclusive) of the dirty page within the element
at {@link #regionIndex}. */
@@ -185,7 +145,7 @@ class CheckpointDirtyPages {
/**
* Private constructor.
*
- * @param regionIndex Element index in {@link
CheckpointDirtyPages#dirtyPages}.
+ * @param regionIndex Element index in {@link
CheckpointDirtyPages#dirtyPagesAndPartitions}.
* @param fromPosition Starting position (inclusive) of the dirty page
within the element at {@link #regionIndex}.
* @param toPosition End position (exclusive) of the dirty page within
the element at {@link #regionIndex}.
*/
@@ -201,14 +161,14 @@ class CheckpointDirtyPages {
* @param index Dirty page index.
*/
public FullPageId get(int index) {
- return dirtyPages.get(this.regionIndex).dirtyPages[fromPosition +
index];
+ return
dirtyPagesAndPartitions.get(this.regionIndex).dirtyPages[fromPosition + index];
}
/**
* Returns the page memory for view.
*/
public PersistentPageMemory pageMemory() {
- return dirtyPages.get(regionIndex).pageMemory;
+ return dirtyPagesAndPartitions.get(regionIndex).pageMemory;
}
/**
@@ -217,14 +177,6 @@ class CheckpointDirtyPages {
public int size() {
return toPosition - fromPosition;
}
-
- private CheckpointDirtyPages owner() {
- return CheckpointDirtyPages.this;
- }
-
- private boolean needsNextRegion() {
- return toPosition == dirtyPages.get(regionIndex).dirtyPages.length;
- }
}
private static boolean equalsByGroupAndPartition(FullPageId pageId0,
FullPageId pageId1) {
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 4e9c38f46a..4d85b72087 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -317,7 +317,18 @@ public class CheckpointManager {
CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture =
filePageStore.getOrCreateNewDeltaFile(
index ->
filePageStoreManager.tmpDeltaFilePageStorePath(pageId.groupId(),
pageId.partitionId(), index),
- () ->
pageIndexesForDeltaFilePageStore(pagesToWrite.getPartitionView(pageMemory,
pageId.groupId(), pageId.partitionId()))
+ () -> {
+ CheckpointDirtyPagesView partitionView =
pagesToWrite.getPartitionView(
+ pageMemory,
+ pageId.groupId(),
+ pageId.partitionId()
+ );
+
+ assert partitionView != null : String.format("Unable to
find view for dirty pages: [patitionId=%s, pageMemory=%s]",
+ GroupPartitionId.convert(pageId), pageMemory);
+
+ return pageIndexesForDeltaFilePageStore(partitionView);
+ }
);
deltaFilePageStoreFuture.join().write(pageId.pageId(), pageBuf,
calculateCrc);
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index e1ae8085e7..bf93a24494 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
import static
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId;
import static
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
-import static
org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.EMPTY;
import static org.apache.ignite.internal.util.StringUtils.hexLong;
import java.nio.ByteBuffer;
@@ -31,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.Partition
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
import org.apache.ignite.internal.pagememory.persistence.io.PartitionMetaIo;
import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue.Result;
@@ -55,6 +56,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Implementation of page writer which able to store pages to disk during
checkpoint.
*/
+// TODO: IGNITE-23203 Write retry pages in multiple threads
public class CheckpointPagesWriter implements Runnable {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(CheckpointPagesWriter.class);
@@ -63,25 +65,20 @@ public class CheckpointPagesWriter implements Runnable {
* Size of a batch of pages that we drain from a single checkpoint buffer
at the same time. The value of {@code 10} is chosen
* arbitrarily. We may reconsider it in <a
href="https://issues.apache.org/jira/browse/IGNITE-23106">IGNITE-23106</a> if
necessary.
*
- * @see #drainCheckpointBuffers(ByteBuffer, Map)
+ * @see #drainCheckpointBuffers(ByteBuffer)
*/
private static final int CP_BUFFER_PAGES_BATCH_THRESHOLD = 10;
/** Checkpoint specific metrics tracker. */
private final CheckpointMetricsTracker tracker;
- /**
- * Queue of dirty page IDs to write under this task.
- *
- * <p>Overall pages to write may be greater than this queue, since it may
be necessary to retire write some pages due to unsuccessful
- * page write lock acquisition
- */
- private final IgniteConcurrentMultiPairQueue<PersistentPageMemory,
FullPageId> writePageIds;
+ /** Queue of dirty partitions IDs to write under this task. */
+ private final IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> dirtyPartitionQueue;
/**
* List of {@link PersistentPageMemory} instances that have dirty
partitions in current checkpoint.
*
- * @see #drainCheckpointBuffers(ByteBuffer, Map)
+ * @see #drainCheckpointBuffers(ByteBuffer)
*/
private final List<PersistentPageMemory> pageMemoryList;
@@ -116,7 +113,7 @@ public class CheckpointPagesWriter implements Runnable {
* Creates task for write pages.
*
* @param tracker Checkpoint metrics tracker.
- * @param writePageIds Queue of dirty page IDs to write.
+ * @param dirtyPartitionQueue Queue of dirty partition IDs to write.
* @param pageMemoryList List of {@link PersistentPageMemory} instances
that have dirty partitions in current checkpoint.
* @param updatedPartitions Updated partitions.
* @param doneFut Done future.
@@ -130,7 +127,7 @@ public class CheckpointPagesWriter implements Runnable {
*/
CheckpointPagesWriter(
CheckpointMetricsTracker tracker,
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds,
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> dirtyPartitionQueue,
List<PersistentPageMemory> pageMemoryList,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneFut,
@@ -143,7 +140,7 @@ public class CheckpointPagesWriter implements Runnable {
BooleanSupplier shutdownNow
) {
this.tracker = tracker;
- this.writePageIds = writePageIds;
+ this.dirtyPartitionQueue = dirtyPartitionQueue;
this.pageMemoryList = pageMemoryList;
this.updatedPartitions = updatedPartitions;
this.doneFut = doneFut;
@@ -159,15 +156,26 @@ public class CheckpointPagesWriter implements Runnable {
@Override
public void run() {
try {
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
pageIdsToRetry = writePages(writePageIds);
+ Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new
HashMap<>();
- while (!pageIdsToRetry.isEmpty()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Checkpoint pages were not written yet due to "
- + "unsuccessful page write lock acquisition and
will be retried [pageCount={}]", pageIdsToRetry.size());
- }
+ ByteBuffer tmpWriteBuf = threadBuf.get();
+
+ var queueResult = new Result<PersistentPageMemory,
GroupPartitionId>();
+
+ while (!shutdownNow.getAsBoolean() &&
dirtyPartitionQueue.next(queueResult)) {
+ updateHeartbeat.run();
+
+ PersistentPageMemory pageMemory = queueResult.getKey();
+
+ PageStoreWriter pageStoreWriter =
createPageStoreWriter(pageMemory, pageIdsToRetry);
+
+ writeDirtyPages(pageMemory, queueResult.getValue(),
tmpWriteBuf, pageStoreWriter);
+ }
- pageIdsToRetry = writePages(pageIdsToRetry);
+ while (!shutdownNow.getAsBoolean() && !pageIdsToRetry.isEmpty()) {
+ updateHeartbeat.run();
+
+ pageIdsToRetry = writeRetryDirtyPages(pageIdsToRetry,
tmpWriteBuf);
}
doneFut.complete(null);
@@ -176,90 +184,105 @@ public class CheckpointPagesWriter implements Runnable {
}
}
- /**
- * Writes dirty pages.
- *
- * @param writePageIds Queue of dirty page IDs to write.
- * @return pagesToRetry Queue dirty page IDs which should be retried.
- */
- private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePages(
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds
+ private void writeDirtyPages(
+ PersistentPageMemory pageMemory,
+ GroupPartitionId partitionId,
+ ByteBuffer tmpWriteBuf,
+ PageStoreWriter pageStoreWriter
) throws IgniteInternalCheckedException {
- Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new
HashMap<>();
+ CheckpointDirtyPagesView checkpointDirtyPagesView =
checkpointDirtyPagesView(pageMemory, partitionId);
- Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new
HashMap<>();
+ checkpointProgress.onStartPartitionProcessing(partitionId);
- // Page store writers for checkpoint buffer pages are located in a
separate map, because they would not add elements to a
- // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write
lock acquisitions. It's implemented this way in order to
- // avoid duplicates in "pageIdsToRetry", there must only be a single
source of pages to achieve that.
- Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters =
new HashMap<>();
+ try {
+ if (shouldWriteMetaPage(partitionId)) {
+ writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
+ }
- ByteBuffer tmpWriteBuf = threadBuf.get();
+ for (int i = 0; i < checkpointDirtyPagesView.size() &&
!shutdownNow.getAsBoolean(); i++) {
+ updateHeartbeat.run();
- Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
+ FullPageId pageId = checkpointDirtyPagesView.get(i);
- GroupPartitionId partitionId = null;
+ if (pageId.pageIdx() == 0) {
+ // Skip meta-pages, they are written by
"writePartitionMeta".
+ continue;
+ }
- try {
- // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to
write file per thread.
- while (!shutdownNow.getAsBoolean() &&
writePageIds.next(queueResult)) {
- updateHeartbeat.run();
+ writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageStoreWriter);
+ }
+ } finally {
+ checkpointProgress.onFinishPartitionProcessing(partitionId);
+ }
+ }
- FullPageId fullId = queueResult.getValue();
+ private void writeDirtyPage(
+ PersistentPageMemory pageMemory,
+ FullPageId pageId,
+ ByteBuffer tmpWriteBuf,
+ PageStoreWriter pageStoreWriter
+ ) throws IgniteInternalCheckedException {
+ // Should also be done for partitions that will be destroyed to remove
their pages from the data region.
+ pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(),
pageStoreWriter, tracker);
- PersistentPageMemory pageMemory = queueResult.getKey();
+ drainCheckpointBuffers(tmpWriteBuf);
+ }
- if (hasPartitionChanged(partitionId, fullId)) {
- GroupPartitionId newPartitionId = toPartitionId(fullId);
+ private Map<PersistentPageMemory, List<FullPageId>> writeRetryDirtyPages(
+ Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry,
+ ByteBuffer tmpWriteBuf
+ ) throws IgniteInternalCheckedException {
+ if (LOG.isInfoEnabled()) {
+ int pageCount =
pageIdsToRetry.values().stream().mapToInt(List::size).sum();
- // Starting for the new partition.
-
checkpointProgress.onStartPartitionProcessing(newPartitionId);
+ LOG.info("Checkpoint pages were not written yet due to "
+ + "unsuccessful page write lock acquisition and will be
retried [pageCount={}]", pageCount);
+ }
- if (partitionId != null) {
- // Finishing for the previous partition.
- // TODO
https://issues.apache.org/jira/browse/IGNITE-23105 Reimplement partition
destruction awaiting.
-
checkpointProgress.onFinishPartitionProcessing(partitionId);
- }
+ var newPageIdsToRetry = new HashMap<PersistentPageMemory,
List<FullPageId>>();
- partitionId = newPartitionId;
+ for (Entry<PersistentPageMemory, List<FullPageId>> entry :
pageIdsToRetry.entrySet()) {
+ PersistentPageMemory pageMemory = entry.getKey();
- if (shouldWriteMetaPage(partitionId)) {
- writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
+ PageStoreWriter pageStoreWriter =
createPageStoreWriter(pageMemory, newPageIdsToRetry);
+
+ GroupPartitionId partitionId = null;
+
+ try {
+ for (FullPageId pageId : entry.getValue()) {
+ if (shutdownNow.getAsBoolean()) {
+ return Map.of();
}
- }
- PageStoreWriter pageStoreWriter =
pageStoreWriters.computeIfAbsent(
- pageMemory,
- pm -> createPageStoreWriter(pm, pageIdsToRetry)
- );
+ updateHeartbeat.run();
+
+ if (partitionIdChanged(partitionId, pageId)) {
+ if (partitionId != null) {
+
checkpointProgress.onFinishPartitionProcessing(partitionId);
+ }
- if (fullId.pageIdx() == 0) {
- // Skip meta-pages, they are written by
"writePartitionMeta".
- continue;
- }
+ partitionId = GroupPartitionId.convert(pageId);
- // Should also be done for partitions that will be destroyed
to remove their pages from the data region.
- pageMemory.checkpointWritePage(fullId, tmpWriteBuf.rewind(),
pageStoreWriter, tracker);
+
checkpointProgress.onStartPartitionProcessing(partitionId);
+ }
- drainCheckpointBuffers(tmpWriteBuf, cpBufferPageStoreWriters);
- }
- } finally {
- if (partitionId != null) {
- checkpointProgress.onFinishPartitionProcessing(partitionId);
+ writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageStoreWriter);
+ }
+ } finally {
+ if (partitionId != null) {
+
checkpointProgress.onFinishPartitionProcessing(partitionId);
+ }
}
}
- return pageIdsToRetry.isEmpty() ? EMPTY : new
IgniteConcurrentMultiPairQueue<>(pageIdsToRetry);
+ return newPageIdsToRetry;
}
/**
* Checkpoints parts of checkpoint buffers if they are close to overflow.
Uses
* {@link PersistentPageMemory#isCpBufferOverflowThresholdExceeded()} to
detect that.
*/
- private void drainCheckpointBuffers(
- ByteBuffer tmpWriteBuf,
- Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters
- ) throws IgniteInternalCheckedException {
+ private void drainCheckpointBuffers(ByteBuffer tmpWriteBuf) throws
IgniteInternalCheckedException {
PageStoreWriter pageStoreWriter;
boolean retry = true;
@@ -288,16 +311,13 @@ public class CheckpointPagesWriter implements Runnable {
break;
}
- GroupPartitionId partitionId = toPartitionId(cpPageId);
+ GroupPartitionId partitionId =
GroupPartitionId.convert(cpPageId);
if (shouldWriteMetaPage(partitionId)) {
writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
}
- pageStoreWriter = pageStoreWriters.computeIfAbsent(
- pageMemory,
- pm -> createPageStoreWriter(pm, null)
- );
+ pageStoreWriter = createPageStoreWriter(pageMemory, null);
pageMemory.checkpointWritePage(cpPageId,
tmpWriteBuf.rewind(), pageStoreWriter, tracker);
}
@@ -350,7 +370,7 @@ public class CheckpointPagesWriter implements Runnable {
pageWriter.write(pageMemory, fullPageId, buf);
- updatedPartitions.get(toPartitionId(fullPageId)).increment();
+
updatedPartitions.get(GroupPartitionId.convert(fullPageId)).increment();
};
}
@@ -381,11 +401,24 @@ public class CheckpointPagesWriter implements Runnable {
updateHeartbeat.run();
}
- private static boolean hasPartitionChanged(@Nullable GroupPartitionId
partitionId, FullPageId pageId) {
- return partitionId == null || partitionId.getPartitionId() !=
pageId.partitionId() || partitionId.getGroupId() != pageId.groupId();
+ private CheckpointDirtyPagesView
checkpointDirtyPagesView(PersistentPageMemory pageMemory, GroupPartitionId
partitionId) {
+ CheckpointDirtyPages checkpointDirtyPages =
checkpointProgress.pagesToWrite();
+
+ assert checkpointDirtyPages != null;
+
+ CheckpointDirtyPagesView partitionView =
checkpointDirtyPages.getPartitionView(
+ pageMemory,
+ partitionId.getGroupId(),
+ partitionId.getPartitionId()
+ );
+
+ assert partitionView != null : String.format("Unable to find view for
dirty pages: [patitionId=%s, pageMemory=%s]", partitionId,
+ pageMemory);
+
+ return partitionView;
}
- private static GroupPartitionId toPartitionId(FullPageId pageId) {
- return new GroupPartitionId(pageId.groupId(), pageId.partitionId());
+ private static boolean partitionIdChanged(@Nullable GroupPartitionId
partitionId, FullPageId pageId) {
+ return partitionId == null || partitionId.getGroupId() !=
pageId.groupId() || partitionId.getPartitionId() != pageId.partitionId();
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index 301313ccd7..c0c05e5ca1 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
-import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
@@ -82,7 +81,7 @@ public class CheckpointPagesWriterFactory {
* Returns instance of page checkpoint writer.
*
* @param tracker Checkpoint metrics tracker.
- * @param dirtyPageIdQueue Checkpoint dirty page ID queue to write.
+ * @param dirtyPartitionQueue Checkpoint dirty partition ID queue to write.
* @param pageMemoryList List of {@link PersistentPageMemory} instances
that have dirty partitions in current checkpoint.
* @param updatedPartitions Updated partitions.
* @param doneWriteFut Write done future.
@@ -92,7 +91,7 @@ public class CheckpointPagesWriterFactory {
*/
CheckpointPagesWriter build(
CheckpointMetricsTracker tracker,
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
dirtyPageIdQueue,
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> dirtyPartitionQueue,
List<PersistentPageMemory> pageMemoryList,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneWriteFut,
@@ -103,7 +102,7 @@ public class CheckpointPagesWriterFactory {
) {
return new CheckpointPagesWriter(
tracker,
- dirtyPageIdQueue,
+ dirtyPartitionQueue,
pageMemoryList,
updatedPartitions,
doneWriteFut,
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index bc074fc851..ce5a4c9dbc 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -37,6 +37,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.CollectionUtils;
@@ -409,13 +411,15 @@ class CheckpointWorkflow {
CheckpointDirtyPages createAndSortCheckpointDirtyPages(
DataRegionsDirtyPages dataRegionsDirtyPages
) throws IgniteInternalCheckedException {
- List<DataRegionDirtyPages<FullPageId[]>> checkpointDirtyPages = new
ArrayList<>();
+ var checkpointDirtyPages = new ArrayList<DirtyPagesAndPartitions>();
int realPagesArrSize = 0;
- // Collect arrays of dirty pages for sorting.
+ // Collects dirty pages into an array (then we will sort them) and
collects dirty partitions.
for (DataRegionDirtyPages<Collection<FullPageId>> dataRegionDirtyPages
: dataRegionsDirtyPages.dirtyPages) {
- FullPageId[] pageIds = new
FullPageId[dataRegionDirtyPages.dirtyPages.size()];
+ var pageIds = new
FullPageId[dataRegionDirtyPages.dirtyPages.size()];
+
+ var partitionIds = new HashSet<GroupPartitionId>();
int pagePos = 0;
@@ -424,6 +428,7 @@ class CheckpointWorkflow {
"Incorrect estimated dirty pages number: " +
dataRegionsDirtyPages.dirtyPageCount;
pageIds[pagePos++] = dirtyPage;
+ partitionIds.add(GroupPartitionId.convert(dirtyPage));
}
// Some pages may have been already replaced.
@@ -433,20 +438,20 @@ class CheckpointWorkflow {
pageIds = Arrays.copyOf(pageIds, pagePos);
}
- checkpointDirtyPages.add(new
DataRegionDirtyPages<>(dataRegionDirtyPages.pageMemory, pageIds));
+ checkpointDirtyPages.add(new
DirtyPagesAndPartitions(dataRegionDirtyPages.pageMemory, pageIds,
partitionIds));
}
// Add tasks to sort arrays of dirty page IDs in parallel if their
number is greater than or equal to PARALLEL_SORT_THRESHOLD.
List<ForkJoinTask<?>> parallelSortTasks = checkpointDirtyPages.stream()
- .map(dataRegionDirtyPages -> dataRegionDirtyPages.dirtyPages)
+ .map(dirtyPagesAndPartitions ->
dirtyPagesAndPartitions.dirtyPages)
.filter(pageIds -> pageIds.length >= PARALLEL_SORT_THRESHOLD)
.map(pageIds -> parallelSortThreadPool.submit(() ->
Arrays.parallelSort(pageIds, DIRTY_PAGE_COMPARATOR)))
.collect(toList());
// Sort arrays of dirty page IDs if their number is less than
PARALLEL_SORT_THRESHOLD.
- for (DataRegionDirtyPages<FullPageId[]> dataRegionDirtyPages :
checkpointDirtyPages) {
- if (dataRegionDirtyPages.dirtyPages.length <
PARALLEL_SORT_THRESHOLD) {
- Arrays.sort(dataRegionDirtyPages.dirtyPages,
DIRTY_PAGE_COMPARATOR);
+ for (DirtyPagesAndPartitions dirtyPagesAndPartitions :
checkpointDirtyPages) {
+ if (dirtyPagesAndPartitions.dirtyPages.length <
PARALLEL_SORT_THRESHOLD) {
+ Arrays.sort(dirtyPagesAndPartitions.dirtyPages,
DIRTY_PAGE_COMPARATOR);
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 39a7bbfe69..4b8c5457d8 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -57,7 +57,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.DataRegion;
-import org.apache.ignite.internal.pagememory.FullPageId;
import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
@@ -458,12 +457,13 @@ public class Checkpointer extends IgniteWorker {
List<PersistentPageMemory> pageMemoryList =
checkpointDirtyPages.dirtyPageMemoryInstances();
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds = checkpointDirtyPages.toDirtyPageIdQueue();
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory, GroupPartitionId>
dirtyPartitionQueue
+ = checkpointDirtyPages.toDirtyPartitionQueue();
for (int i = 0; i < checkpointWritePageThreads; i++) {
CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
tracker,
- writePageIds,
+ dirtyPartitionQueue,
pageMemoryList,
updatedPartitions,
futures[i] = new CompletableFuture<>(),
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DirtyPagesAndPartitions.java
similarity index 51%
copy from
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
copy to
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DirtyPagesAndPartitions.java
index 6f5f6c3fc5..eadbbd2802 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/DirtyPagesAndPartitions.java
@@ -17,32 +17,22 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.util.List;
+import java.util.Set;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.Test;
-/**
- * For {@link Checkpoint} testing.
- */
-public class CheckpointTest extends BaseIgniteAbstractTest {
- @Test
- void testHasDelta() {
- CheckpointProgressImpl progress = mock(CheckpointProgressImpl.class);
+/** Container of dirty pages and partitions. */
+class DirtyPagesAndPartitions {
+ final PersistentPageMemory pageMemory;
- assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
+ final FullPageId[] dirtyPages;
- DataRegionDirtyPages<FullPageId[]> biTuple = new
DataRegionDirtyPages<>(
- mock(PersistentPageMemory.class),
- new FullPageId[]{new FullPageId(0, 1)}
- );
+ final Set<GroupPartitionId> dirtyPartitions;
- assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)),
progress).hasDelta());
+ DirtyPagesAndPartitions(PersistentPageMemory pageMemory, FullPageId[]
dirtyPages, Set<GroupPartitionId> dirtyPartitions) {
+ this.pageMemory = pageMemory;
+ this.dirtyPages = dirtyPages;
+ this.dirtyPartitions = dirtyPartitions;
}
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
index 415d97fc71..05c1647897 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java
@@ -30,13 +30,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
import org.apache.ignite.internal.pagememory.util.PageIdUtils;
@@ -51,8 +51,8 @@ import org.junit.jupiter.api.Test;
public class CheckpointDirtyPagesTest extends BaseIgniteAbstractTest {
@Test
void testDirtyPagesCount() {
- DataRegionDirtyPages<FullPageId[]> dirtyPages0 =
createDirtyPages(of(0, 0, 0), of(0, 0, 1));
- DataRegionDirtyPages<FullPageId[]> dirtyPages1 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
+ DirtyPagesAndPartitions dirtyPages0 =
createDirtyPagesAndPartitions(of(0, 0, 0), of(0, 0, 1));
+ DirtyPagesAndPartitions dirtyPages1 =
createDirtyPagesAndPartitions(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
assertEquals(0, EMPTY.dirtyPagesCount());
assertEquals(2, new
CheckpointDirtyPages(List.of(dirtyPages0)).dirtyPagesCount());
@@ -61,18 +61,18 @@ public class CheckpointDirtyPagesTest extends
BaseIgniteAbstractTest {
}
@Test
- void testToDirtyPageIdQueue() {
- assertTrue(EMPTY.toDirtyPageIdQueue().isEmpty());
+ void testToDirtyPartitionQueue() {
+ assertTrue(EMPTY.toDirtyPartitionQueue().isEmpty());
- DataRegionDirtyPages<FullPageId[]> dirtyPages0 =
createDirtyPages(of(0, 0, 0));
- DataRegionDirtyPages<FullPageId[]> dirtyPages1 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
- DataRegionDirtyPages<FullPageId[]> dirtyPages2 =
createDirtyPages(of(2, 0, 0), of(2, 1, 0), of(3, 2, 2));
+ DirtyPagesAndPartitions dirtyPages0 =
createDirtyPagesAndPartitions(of(0, 0, 0));
+ DirtyPagesAndPartitions dirtyPages1 =
createDirtyPagesAndPartitions(of(1, 0, 0), of(1, 0, 1));
+ DirtyPagesAndPartitions dirtyPages2 =
createDirtyPagesAndPartitions(of(2, 0, 0), of(2, 1, 0), of(3, 2, 2));
- CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2));
+ var checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2));
assertThat(
- toListPair(checkpointDirtyPages.toDirtyPageIdQueue()),
- equalTo(toListPair(dirtyPages0, dirtyPages1, dirtyPages2))
+ toListPair(checkpointDirtyPages.toDirtyPartitionQueue()),
+ equalTo(toListDirtyPartitionPair(dirtyPages0, dirtyPages1,
dirtyPages2))
);
}
@@ -80,16 +80,16 @@ public class CheckpointDirtyPagesTest extends
BaseIgniteAbstractTest {
void testGetPartitionViewByPageMemory() {
assertThrows(IllegalArgumentException.class, () ->
EMPTY.getPartitionView(mock(PersistentPageMemory.class), 0, 0));
- DataRegionDirtyPages<FullPageId[]> dirtyPages0 =
createDirtyPages(of(0, 0, 0));
- DataRegionDirtyPages<FullPageId[]> dirtyPages1 =
createDirtyPages(of(5, 0, 0));
- DataRegionDirtyPages<FullPageId[]> dirtyPages2 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
- DataRegionDirtyPages<FullPageId[]> dirtyPages3 = createDirtyPages(
+ DirtyPagesAndPartitions dirtyPages0 =
createDirtyPagesAndPartitions(of(0, 0, 0));
+ DirtyPagesAndPartitions dirtyPages1 =
createDirtyPagesAndPartitions(of(5, 0, 0));
+ DirtyPagesAndPartitions dirtyPages2 =
createDirtyPagesAndPartitions(of(1, 0, 0), of(1, 0, 1));
+ DirtyPagesAndPartitions dirtyPages3 = createDirtyPagesAndPartitions(
of(2, 0, 0), of(2, 0, 1),
of(2, 1, 1),
of(3, 2, 2), of(3, 2, 3)
);
- CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2,
dirtyPages3));
+ var checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2,
dirtyPages3));
assertNull(checkpointDirtyPages.getPartitionView(dirtyPages0.pageMemory, 4, 0));
assertNull(checkpointDirtyPages.getPartitionView(dirtyPages1.pageMemory, 4, 0));
@@ -99,83 +99,36 @@ public class CheckpointDirtyPagesTest extends
BaseIgniteAbstractTest {
assertNull(checkpointDirtyPages.getPartitionView(dirtyPages2.pageMemory, 5, 0));
assertThat(
-
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages0.pageMemory, 0, 0)),
- equalTo(toListPair(dirtyPages0))
+
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages0.pageMemory,
0, 0)),
+ equalTo(toListDirtyPagePair(dirtyPages0))
);
assertThat(
-
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages1.pageMemory, 5, 0)),
- equalTo(toListPair(dirtyPages1))
+
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages1.pageMemory,
5, 0)),
+ equalTo(toListDirtyPagePair(dirtyPages1))
);
assertThat(
-
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages2.pageMemory, 1, 0)),
- equalTo(toListPair(dirtyPages2))
+
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages2.pageMemory,
1, 0)),
+ equalTo(toListDirtyPagePair(dirtyPages2))
);
assertThat(
-
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory, 2, 0)),
- equalTo(toListPair(equalsByGroupAndPartition(2, 0),
dirtyPages3))
+
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory,
2, 0)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 0),
dirtyPages3))
);
assertThat(
-
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory, 2, 1)),
- equalTo(toListPair(equalsByGroupAndPartition(2, 1),
dirtyPages3))
+
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory,
2, 1)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(2, 1),
dirtyPages3))
);
assertThat(
-
toListPair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory, 3, 2)),
- equalTo(toListPair(equalsByGroupAndPartition(3, 2),
dirtyPages3))
+
toListDirtyPagePair(checkpointDirtyPages.getPartitionView(dirtyPages3.pageMemory,
3, 2)),
+ equalTo(toListDirtyPagePair(equalsByGroupAndPartition(3, 2),
dirtyPages3))
);
}
- @Test
- void testNextPartitionView() {
- assertNull(EMPTY.nextPartitionView(null));
-
- DataRegionDirtyPages<FullPageId[]> dirtyPages0 =
createDirtyPages(of(0, 0, 0));
- DataRegionDirtyPages<FullPageId[]> dirtyPages1 =
createDirtyPages(of(5, 0, 0));
- DataRegionDirtyPages<FullPageId[]> dirtyPages2 =
createDirtyPages(of(1, 0, 0), of(1, 0, 1));
- DataRegionDirtyPages<FullPageId[]> dirtyPages3 = createDirtyPages(
- of(2, 0, 0), of(2, 0, 1),
- of(2, 1, 1),
- of(3, 2, 2), of(3, 2, 3)
- );
-
- CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(dirtyPages0, dirtyPages1, dirtyPages2,
dirtyPages3));
-
- CheckpointDirtyPagesView view =
checkpointDirtyPages.nextPartitionView(null);
-
- assertThat(toListPair(view), equalTo(toListPair(dirtyPages0)));
-
- assertThat(
- toListPair(view =
checkpointDirtyPages.nextPartitionView(view)),
- equalTo(toListPair(dirtyPages1))
- );
-
- assertThat(
- toListPair(view =
checkpointDirtyPages.nextPartitionView(view)),
- equalTo(toListPair(dirtyPages2))
- );
-
- assertThat(
- toListPair(view =
checkpointDirtyPages.nextPartitionView(view)),
- equalTo(toListPair(equalsByGroupAndPartition(2, 0),
dirtyPages3))
- );
-
- assertThat(
- toListPair(view =
checkpointDirtyPages.nextPartitionView(view)),
- equalTo(toListPair(equalsByGroupAndPartition(2, 1),
dirtyPages3))
- );
-
- assertThat(
- toListPair(view =
checkpointDirtyPages.nextPartitionView(view)),
- equalTo(toListPair(equalsByGroupAndPartition(3, 2),
dirtyPages3))
- );
-
- assertNull(checkpointDirtyPages.nextPartitionView(view));
- }
-
@Test
void testSortDirtyPageIds() {
List<FullPageId> pageIds = new ArrayList<>(List.of(
@@ -196,10 +149,8 @@ public class CheckpointDirtyPagesTest extends
BaseIgniteAbstractTest {
);
}
- private static DataRegionDirtyPages<FullPageId[]>
createDirtyPages(FullPageId... pageIds) {
- Arrays.sort(pageIds, DIRTY_PAGE_COMPARATOR);
-
- return new DataRegionDirtyPages<>(mock(PersistentPageMemory.class),
pageIds);
+ private static DirtyPagesAndPartitions
createDirtyPagesAndPartitions(FullPageId... pageIds) {
+ return
TestCheckpointUtils.createDirtyPagesAndPartitions(mock(PersistentPageMemory.class),
pageIds);
}
private static FullPageId of(int groupId, int partId, int pageIdx) {
@@ -220,13 +171,21 @@ public class CheckpointDirtyPagesTest extends
BaseIgniteAbstractTest {
.collect(toList());
}
- private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListPair(DataRegionDirtyPages<FullPageId[]>... dirtyPages) {
- return toListPair(dirtyPageId -> true, dirtyPages);
+ private static List<IgniteBiTuple<PersistentPageMemory, GroupPartitionId>>
toListDirtyPartitionPair(
+ DirtyPagesAndPartitions... dirtyPagesAndPartitions
+ ) {
+ return Stream.of(dirtyPagesAndPartitions)
+ .flatMap(pages ->
pages.dirtyPartitions.stream().map(partitionId -> new
IgniteBiTuple<>(pages.pageMemory, partitionId)))
+ .collect(toList());
+ }
+
+ private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListDirtyPagePair(DirtyPagesAndPartitions... dirtyPages) {
+ return toListDirtyPagePair(dirtyPageId -> true, dirtyPages);
}
- private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListPair(
+ private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListDirtyPagePair(
Predicate<FullPageId> predicate,
- DataRegionDirtyPages<FullPageId[]>... dirtyPages
+ DirtyPagesAndPartitions... dirtyPages
) {
return Stream.of(dirtyPages)
.flatMap(pages -> Stream.of(pages.dirtyPages)
@@ -236,7 +195,7 @@ public class CheckpointDirtyPagesTest extends
BaseIgniteAbstractTest {
.collect(toList());
}
- private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListPair(CheckpointDirtyPagesView view) {
+ private static List<IgniteBiTuple<PersistentPageMemory, FullPageId>>
toListDirtyPagePair(CheckpointDirtyPagesView view) {
return IntStream.range(0, view.size()).mapToObj(i -> new
IgniteBiTuple<>(view.pageMemory(), view.get(i))).collect(toList());
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
index 76d6dd29b6..d21fcfc89a 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgenc
import static
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.SHOULD_TRIGGER;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.checkpointUrgency;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager.pageIndexesForDeltaFilePageStore;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -146,9 +147,9 @@ public class CheckpointManagerTest extends
BaseIgniteAbstractTest {
PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
- CheckpointDirtyPages dirtyPages = new CheckpointDirtyPages(List.of(
- new DataRegionDirtyPages<>(pageMemory0, dirtyPageArray(0, 0,
1)),
- new DataRegionDirtyPages<>(pageMemory1, dirtyPageArray(0, 1,
2, 3, 4))
+ var dirtyPages = new CheckpointDirtyPages(List.of(
+ createDirtyPagesAndPartitions(pageMemory0, dirtyPageArray(0,
0, 1)),
+ createDirtyPagesAndPartitions(pageMemory1, dirtyPageArray(0,
1, 2, 3, 4))
));
assertArrayEquals(new int[]{0, 1},
pageIndexesForDeltaFilePageStore(dirtyPages.getPartitionView(pageMemory0, 0,
0)));
@@ -160,9 +161,9 @@ public class CheckpointManagerTest extends
BaseIgniteAbstractTest {
PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
- CheckpointDirtyPages dirtyPages = new CheckpointDirtyPages(List.of(
- new DataRegionDirtyPages<>(pageMemory0, dirtyPageArray(0, 0,
0, 1)),
- new DataRegionDirtyPages<>(pageMemory1, dirtyPageArray(0, 1,
0, 2, 3, 4))
+ var dirtyPages = new CheckpointDirtyPages(List.of(
+ createDirtyPagesAndPartitions(pageMemory0, dirtyPageArray(0,
0, 0, 1)),
+ createDirtyPagesAndPartitions(pageMemory1, dirtyPageArray(0,
1, 0, 2, 3, 4))
));
assertArrayEquals(new int[]{0, 1},
pageIndexesForDeltaFilePageStore(dirtyPages.getPartitionView(pageMemory0, 0,
0)));
@@ -209,9 +210,7 @@ public class CheckpointManagerTest extends
BaseIgniteAbstractTest {
CheckpointProgress checkpointProgress = mock(CheckpointProgress.class);
- CheckpointDirtyPages dirtyPages = new CheckpointDirtyPages(List.of(
- new DataRegionDirtyPages<>(pageMemory, new
FullPageId[]{dirtyPageId})
- ));
+ var dirtyPages = new
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory,
dirtyPageId)));
when(checkpointProgress.inProgress()).thenReturn(true);
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index 97d5dfdb04..94c305aac6 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.TRY_AGAIN_TAG;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.createPartitionMetaManager;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
@@ -101,9 +102,9 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
FullPageId fullPageId5 = new FullPageId(pageId(0, FLAG_DATA, 5), 0);
FullPageId fullPageId6 = new FullPageId(pageId(1, FLAG_DATA, 6), 0);
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds = new IgniteConcurrentMultiPairQueue<>(
- Map.of(pageMemory, List.of(fullPageId1, fullPageId2,
fullPageId3, fullPageId4, fullPageId5, fullPageId6))
- );
+ CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(
+ createDirtyPagesAndPartitions(pageMemory, fullPageId1,
fullPageId2, fullPageId3, fullPageId4, fullPageId5, fullPageId6)
+ ));
GroupPartitionId groupPartId0 = groupPartId(0, 0);
GroupPartitionId groupPartId1 = groupPartId(0, 1);
@@ -123,13 +124,17 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+ progressImpl.pagesToWrite(checkpointDirtyPages);
PartitionMeta partitionMeta0 = mock(PartitionMeta.class);
PartitionMeta partitionMeta1 = mock(PartitionMeta.class);
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory, GroupPartitionId>
dirtyPartitionQueue
+ = checkpointDirtyPages.toDirtyPartitionQueue();
+
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
tracker,
- writePageIds,
+ dirtyPartitionQueue,
singletonList(pageMemory),
updatedPartitions,
doneFuture,
@@ -148,7 +153,7 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
assertDoesNotThrow(() -> doneFuture.get(1, TimeUnit.SECONDS));
- assertTrue(writePageIds.isEmpty());
+ assertTrue(dirtyPartitionQueue.isEmpty());
assertThat(updatedPartitions.keySet(),
containsInAnyOrder(groupPartId0, groupPartId1));
@@ -162,17 +167,20 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
writtenFullPageIds.getAllValues(),
equalTo(List.of(
// At the beginning, we write the partition meta for
each new partition.
- fullPageId(0, 0, 0),
+ new FullPageId(pageId(0, FLAG_AUX, 0), 0),
// Order is different because the first 3 pages we
have to try to write to the page store 2 times.
fullPageId4, fullPageId5,
- fullPageId(0, 1, 0),
- fullPageId6, fullPageId1, fullPageId2, fullPageId3
+ // At the beginning, we write the partition meta for
each new partition.
+ new FullPageId(pageId(1, FLAG_AUX, 0), 0),
+ fullPageId6,
+ // Now the retry pages.
+ fullPageId1, fullPageId2, fullPageId3
))
);
- verify(beforePageWrite, times(11)).run();
+ verify(beforePageWrite, times(14)).run();
- verify(threadBuf, times(2)).get();
+ verify(threadBuf, times(1)).get();
verify(partitionMeta0, times(1)).metaSnapshot(any(UUID.class));
verify(partitionMeta1, times(1)).metaSnapshot(any(UUID.class));
@@ -195,15 +203,22 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
GroupPartitionId groupPartId = groupPartId(0, 0);
+ CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(
+ createDirtyPagesAndPartitions(pageMemory, new
FullPageId(pageId(0, FLAG_DATA, 1), 0))
+ ));
+
+ CheckpointProgressImpl checkpointProgress = new
CheckpointProgressImpl(0);
+ checkpointProgress.pagesToWrite(checkpointDirtyPages);
+
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
new CheckpointMetricsTracker(),
- new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory,
List.of(fullPageId(0, 0, 1)))),
+ new IgniteConcurrentMultiPairQueue<>(Map.of(pageMemory,
List.of(new GroupPartitionId(0, 0)))),
singletonList(pageMemory),
new ConcurrentHashMap<>(),
doneFuture,
() -> {},
createThreadLocalBuffer(),
- new CheckpointProgressImpl(0),
+ checkpointProgress,
createDirtyPageWriter(null),
ioRegistry,
createPartitionMetaManager(Map.of(groupPartId,
mock(PartitionMeta.class))),
@@ -238,23 +253,29 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
any(CheckpointMetricsTracker.class)
);
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds = new IgniteConcurrentMultiPairQueue<>(
- Map.of(pageMemory, List.of(fullPageId(0, 0, 1), fullPageId(0,
0, 2)))
- );
+ CheckpointDirtyPages checkpointDirtyPages = new
CheckpointDirtyPages(List.of(
+ createDirtyPagesAndPartitions(pageMemory, fullPageId(0, 0, 1),
fullPageId(0, 1, 2))
+ ));
GroupPartitionId groupPartId = groupPartId(0, 0);
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions = new
ConcurrentHashMap<>();
+ CheckpointProgressImpl checkpointProgress = new
CheckpointProgressImpl(0);
+ checkpointProgress.pagesToWrite(checkpointDirtyPages);
+
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory, GroupPartitionId>
dirtyPartitionQueue
+ = checkpointDirtyPages.toDirtyPartitionQueue();
+
CheckpointPagesWriter pagesWriter = new CheckpointPagesWriter(
new CheckpointMetricsTracker(),
- writePageIds,
+ dirtyPartitionQueue,
singletonList(pageMemory),
updatedPartitions,
doneFuture,
() -> {},
createThreadLocalBuffer(),
- new CheckpointProgressImpl(0),
+ checkpointProgress,
createDirtyPageWriter(null),
ioRegistry,
createPartitionMetaManager(Map.of(groupPartId,
mock(PartitionMeta.class))),
@@ -265,7 +286,7 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
assertDoesNotThrow(() -> doneFuture.get(1, TimeUnit.SECONDS));
- assertThat(writePageIds.size(), equalTo(1));
+ assertThat(dirtyPartitionQueue.size(), equalTo(1));
assertThat(updatedPartitions.keySet(), contains(groupPartId));
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
index 6f5f6c3fc5..9facbb11de 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -38,11 +39,11 @@ public class CheckpointTest extends BaseIgniteAbstractTest {
assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
- DataRegionDirtyPages<FullPageId[]> biTuple = new
DataRegionDirtyPages<>(
+ DirtyPagesAndPartitions dirtyPagesAndPartitions =
createDirtyPagesAndPartitions(
mock(PersistentPageMemory.class),
- new FullPageId[]{new FullPageId(0, 1)}
+ new FullPageId(0, 1)
);
- assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)),
progress).hasDelta());
+ assertTrue(new Checkpoint(new
CheckpointDirtyPages(List.of(dirtyPagesAndPartitions)), progress).hasDelta());
}
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 7f670a4efa..9e45b41e4b 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -33,6 +33,7 @@ import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.Check
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
@@ -314,7 +315,7 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
verify(progressImpl,
times(1)).pagesToWrite(any(CheckpointDirtyPages.class));
verify(progressImpl, times(1)).initCounters(anyInt());
- CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.nextPartitionView(null);
+ CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.getPartitionView(pageMemory, 0, 0);
assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(dirtyPages));
assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegion.pageMemory()));
@@ -383,7 +384,7 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
workflow.addCheckpointListener(checkpointListener, dataRegion);
workflow.markCheckpointEnd(new Checkpoint(
- new
CheckpointDirtyPages(List.of(createCheckpointDirtyPages(pageMemory, of(0, 0,
0)))),
+ new
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory, of(0, 0,
0)))),
progressImpl
));
@@ -408,15 +409,18 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
@Test
void testCreateAndSortCheckpointDirtyPages() throws Exception {
+ PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
+ PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
+
DataRegionDirtyPages<Collection<FullPageId>> dataRegionDirtyPages0 =
createDataRegionDirtyPages(
- mock(PersistentPageMemory.class),
+ pageMemory0,
of(10, 10, 2), of(10, 10, 1), of(10, 10, 0),
of(10, 5, 100), of(10, 5, 99),
of(10, 1, 50), of(10, 1, 51), of(10, 1, 99)
);
DataRegionDirtyPages<Collection<FullPageId>> dataRegionDirtyPages1 =
createDataRegionDirtyPages(
- mock(PersistentPageMemory.class),
+ pageMemory1,
of(77, 5, 100), of(77, 5, 99),
of(88, 1, 51), of(88, 1, 50), of(88, 1, 99),
of(66, 33, 0), of(66, 33, 1), of(66, 33, 2)
@@ -435,32 +439,32 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
new DataRegionsDirtyPages(List.of(dataRegionDirtyPages0,
dataRegionDirtyPages1))
);
- CheckpointDirtyPagesView dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(null);
+ CheckpointDirtyPagesView dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 10, 1);
assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10,
1, 50), of(10, 1, 51), of(10, 1, 99))));
assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages0.pageMemory));
- dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+ dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 10, 5);
assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10,
5, 99), of(10, 5, 100))));
assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages0.pageMemory));
- dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+ dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 10, 10);
assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(10,
10, 0), of(10, 10, 1), of(10, 10, 2))));
assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages0.pageMemory));
- dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+ dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 66, 33);
assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(66,
33, 0), of(66, 33, 1), of(66, 33, 2))));
assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages1.pageMemory));
- dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+ dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 77, 5);
assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(77,
5, 99), of(77, 5, 100))));
assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages1.pageMemory));
- dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+ dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 88, 1);
assertThat(toListDirtyPageIds(dirtyPagesView), equalTo(List.of(of(88,
1, 50), of(88, 1, 51), of(88, 1, 99))));
assertThat(dirtyPagesView.pageMemory(),
equalTo(dataRegionDirtyPages1.pageMemory));
@@ -482,16 +486,19 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
workflow.start();
+ PersistentPageMemory pageMemory0 = mock(PersistentPageMemory.class);
+ PersistentPageMemory pageMemory1 = mock(PersistentPageMemory.class);
+
CheckpointDirtyPages sortCheckpointDirtyPages =
workflow.createAndSortCheckpointDirtyPages(new DataRegionsDirtyPages(List.of(
- createDataRegionDirtyPages(mock(PersistentPageMemory.class),
dirtyPages1),
- createDataRegionDirtyPages(mock(PersistentPageMemory.class),
dirtyPages0)
+ createDataRegionDirtyPages(pageMemory1, dirtyPages1),
+ createDataRegionDirtyPages(pageMemory0, dirtyPages0)
)));
- CheckpointDirtyPagesView dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(null);
+ CheckpointDirtyPagesView dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory1, 1, 1);
assertThat(toListDirtyPageIds(dirtyPagesView),
equalTo(List.of(dirtyPages1)));
- dirtyPagesView =
sortCheckpointDirtyPages.nextPartitionView(dirtyPagesView);
+ dirtyPagesView =
sortCheckpointDirtyPages.getPartitionView(pageMemory0, 0, 0);
assertThat(
toListDirtyPageIds(dirtyPagesView),
@@ -533,7 +540,7 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
assertEquals(1, checkpoint.dirtyPagesSize);
- CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.nextPartitionView(null);
+ CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.getPartitionView(pageMemory, groupId, partitionId);
assertNotNull(dirtyPagesView);
assertThat(toListDirtyPageIds(dirtyPagesView),
is(List.of(metaPageId)));
@@ -576,7 +583,7 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
assertEquals(2, checkpoint.dirtyPagesSize);
- CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.nextPartitionView(null);
+ CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.getPartitionView(pageMemory, groupId, partitionId);
assertNotNull(dirtyPagesView);
assertThat(toListDirtyPageIds(dirtyPagesView), is(List.of(metaPageId,
dataPageId)));
@@ -663,13 +670,6 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
return mock;
}
- private static DataRegionDirtyPages<FullPageId[]>
createCheckpointDirtyPages(
- PersistentPageMemory pageMemory,
- FullPageId... pageIds
- ) {
- return new DataRegionDirtyPages<>(pageMemory, pageIds);
- }
-
private static DataRegionDirtyPages<Collection<FullPageId>>
createDataRegionDirtyPages(
PersistentPageMemory pageMemory,
FullPageId... pageIds
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 38506b649d..976174068a 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -22,11 +22,11 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FACTORY;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.TestCheckpointUtils.createDirtyPagesAndPartitions;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -56,7 +56,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -391,7 +390,7 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
assertDoesNotThrow(checkpointer::doCheckpoint);
- verify(dirtyPages, times(1)).toDirtyPageIdQueue();
+ verify(dirtyPages, times(1)).toDirtyPartitionQueue();
verify(checkpointer, times(1)).startCheckpointProgress();
verify(compactor, times(1)).triggerCompaction();
verify(mockLogSyncer, times(1)).sync();
@@ -423,7 +422,7 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
assertDoesNotThrow(checkpointer::doCheckpoint);
- verify(dirtyPages, never()).toDirtyPageIdQueue();
+ verify(dirtyPages, never()).toDirtyPartitionQueue();
verify(checkpointer, times(1)).startCheckpointProgress();
verify(compactor, never()).triggerCompaction();
@@ -518,10 +517,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
onPartitionDestructionFuture.get(1, SECONDS);
}
- private CheckpointDirtyPages dirtyPages(PersistentPageMemory pageMemory,
FullPageId... pageIds) {
- Arrays.sort(pageIds, DIRTY_PAGE_COMPARATOR);
-
- return new CheckpointDirtyPages(List.of(new
DataRegionDirtyPages<>(pageMemory, pageIds)));
+ private static CheckpointDirtyPages dirtyPages(PersistentPageMemory
pageMemory, FullPageId... pageIds) {
+ return new
CheckpointDirtyPages(List.of(createDirtyPagesAndPartitions(pageMemory,
pageIds)));
}
private CheckpointWorkflow createCheckpointWorkflow(CheckpointDirtyPages
dirtyPages) throws Exception {
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
similarity index 54%
copy from
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
copy to
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
index 6f5f6c3fc5..217e19b01f 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/TestCheckpointUtils.java
@@ -17,32 +17,24 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
-import java.util.List;
+import java.util.Arrays;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.Test;
-/**
- * For {@link Checkpoint} testing.
- */
-public class CheckpointTest extends BaseIgniteAbstractTest {
- @Test
- void testHasDelta() {
- CheckpointProgressImpl progress = mock(CheckpointProgressImpl.class);
-
- assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
+/** Helper class for checkpoint testing that may contain useful methods and
constants. */
+class TestCheckpointUtils {
+ /** Sorts dirty pages and creates a new instance {@link
DirtyPagesAndPartitions}. */
+ static DirtyPagesAndPartitions
createDirtyPagesAndPartitions(PersistentPageMemory pageMemory, FullPageId...
dirtyPages) {
+ Arrays.sort(dirtyPages, DIRTY_PAGE_COMPARATOR);
- DataRegionDirtyPages<FullPageId[]> biTuple = new
DataRegionDirtyPages<>(
- mock(PersistentPageMemory.class),
- new FullPageId[]{new FullPageId(0, 1)}
+ return new DirtyPagesAndPartitions(
+ pageMemory,
+ dirtyPages,
+
Arrays.stream(dirtyPages).map(GroupPartitionId::convert).collect(toSet())
);
-
- assertTrue(new Checkpoint(new CheckpointDirtyPages(List.of(biTuple)),
progress).hasDelta());
}
}