This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9e87a296b56 IGNITE-27588 Adjust Checkpoint main/delta files stats
calculation (#7602)
9e87a296b56 is described below
commit 9e87a296b5676f2b9c914ff4ce69c79899c3c5d1
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Tue Feb 17 16:15:41 2026 +0300
IGNITE-27588 Adjust Checkpoint main/delta files stats calculation (#7602)
---
.../ItBplusTreePersistentPageMemoryTest.java | 4 +-
...BplusTreeReuseListPersistentPageMemoryTest.java | 4 +-
.../{WriteDirtyPage.java => PageWriteTarget.java} | 25 ++++-----
.../persistence/PersistentPageMemory.java | 2 +-
.../pagememory/persistence/WriteDirtyPage.java | 3 +-
.../persistence/checkpoint/CheckpointManager.java | 10 +++-
.../checkpoint/CheckpointPagesWriter.java | 38 ++++++++++---
.../checkpoint/CheckpointPagesWriterFactory.java | 3 +-
.../persistence/checkpoint/Checkpointer.java | 21 ++++---
.../checkpoint/PartitionWriteStats.java | 64 ++++++++++++++++++++++
.../replacement/DelayedDirtyPageWrite.java | 1 +
.../checkpoint/CheckpointPagesWriterTest.java | 20 ++++---
.../persistence/checkpoint/CheckpointerTest.java | 8 ++-
.../throttling/PageMemoryThrottlingTest.java | 5 +-
.../pagememory/PersistentPageMemoryDataRegion.java | 7 ++-
.../pagememory/PersistentPageMemoryNoLoadTest.java | 6 +-
16 files changed, 162 insertions(+), 59 deletions(-)
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
index 01f6be69a70..933d600f542 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.persistence.PageHeader;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
@@ -73,8 +74,7 @@ public class ItBplusTreePersistentPageMemoryTest extends
AbstractBplusTreePageMe
LongStream.range(0, CPUS).map(i -> MAX_MEMORY_SIZE /
CPUS).toArray(),
10 * MiB,
new TestPageReadWriteManager(),
- (fullPageId, buf, tag) -> {
- },
+ (fullPageId, buf, tag) -> PageWriteTarget.NONE,
mockCheckpointTimeoutLock(true),
wrapLock(offheapReadWriteLock),
new PartitionDestructionLockManager()
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
index f27fe2e0059..7bd791f1e88 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.persistence.PageHeader;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
@@ -59,8 +60,7 @@ public class ItBplusTreeReuseListPersistentPageMemoryTest
extends AbstractBplusT
LongStream.range(0, CPUS).map(i -> MAX_MEMORY_SIZE /
CPUS).toArray(),
10 * MiB,
new TestPageReadWriteManager(),
- (fullPageId, buf, tag) -> {
- },
+ (fullPageId, buf, tag) -> PageWriteTarget.NONE,
mockCheckpointTimeoutLock(true),
wrapLock(new
OffheapReadWriteLock(OffheapReadWriteLock.DEFAULT_CONCURRENCY_LEVEL)),
new PartitionDestructionLockManager()
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/WriteDirtyPage.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageWriteTarget.java
similarity index 58%
copy from
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/WriteDirtyPage.java
copy to
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageWriteTarget.java
index d252cae5b50..bd21eacf8aa 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/WriteDirtyPage.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageWriteTarget.java
@@ -17,21 +17,16 @@
package org.apache.ignite.internal.pagememory.persistence;
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.pagememory.FullPageId;
-
/**
- * Interface which allows writing dirty page.
+ * Indicates which file a page write operation targeted.
*/
-public interface WriteDirtyPage {
- /**
- * Writes the page to the page store.
- *
- * @param pageMemory Page memory.
- * @param fullPageId Full page id.
- * @param buffer Byte buffer to write from.
- * @throws IgniteInternalCheckedException If failed.
- */
- void write(PersistentPageMemory pageMemory, FullPageId fullPageId,
ByteBuffer buffer) throws IgniteInternalCheckedException;
+public enum PageWriteTarget {
+ /** Page was written to the delta file. */
+ DELTA_FILE,
+
+ /** Page was written to the main file. */
+ MAIN_FILE,
+
+ /** Page write was skipped (e.g., partition being destroyed). */
+ NONE
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index acf29f27ec0..23176422d06 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -266,7 +266,7 @@ public class PersistentPageMemory implements PageMemory {
(pageMemory, fullPageId, buffer) -> {
metrics.incrementWriteToDiskMetric();
- flushDirtyPageForReplacement.write(pageMemory, fullPageId,
buffer);
+ return flushDirtyPageForReplacement.write(pageMemory,
fullPageId, buffer);
},
LOG,
sizes.length - 1,
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/WriteDirtyPage.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/WriteDirtyPage.java
index d252cae5b50..68a9da29f98 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/WriteDirtyPage.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/WriteDirtyPage.java
@@ -31,7 +31,8 @@ public interface WriteDirtyPage {
* @param pageMemory Page memory.
* @param fullPageId Full page id.
* @param buffer Byte buffer to write from.
+ * @return Target file where the page was written.
* @throws IgniteInternalCheckedException If failed.
*/
- void write(PersistentPageMemory pageMemory, FullPageId fullPageId,
ByteBuffer buffer) throws IgniteInternalCheckedException;
+ PageWriteTarget write(PersistentPageMemory pageMemory, FullPageId
fullPageId, ByteBuffer buffer) throws IgniteInternalCheckedException;
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 5a12f566d9c..98d97164451 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -312,9 +313,10 @@ public class CheckpointManager {
* @param pageMemory Page memory.
* @param pageId Page ID.
* @param pageBuf Page buffer to write from.
+ * @return Target file where the page was written.
* @throws IgniteInternalCheckedException If page writing failed (IO error
occurred).
*/
- public void writePageToFilePageStore(
+ public PageWriteTarget writePageToFilePageStore(
PersistentPageMemory pageMemory,
FullPageId pageId,
ByteBuffer pageBuf
@@ -323,13 +325,13 @@ public class CheckpointManager {
// If the partition is deleted (or will be soon), then such writes to
the disk should be skipped.
if (filePageStore == null || filePageStore.isMarkedToDestroy()) {
- return;
+ return PageWriteTarget.NONE;
}
if (pageId.pageIdx() >= filePageStore.checkpointedPageCount()) {
filePageStore.write(pageId.pageId(), pageBuf);
- return;
+ return PageWriteTarget.MAIN_FILE;
}
CheckpointProgress lastCheckpointProgress = lastCheckpointProgress();
@@ -363,6 +365,8 @@ public class CheckpointManager {
);
deltaFilePageStoreFuture.join().write(pageId.pageId(), pageBuf);
+
+ return PageWriteTarget.DELTA_FILE;
}
/**
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index ebfdeb867f5..ba55968136e 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -45,6 +44,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.PartitionMetaSnapshot;
@@ -93,8 +93,8 @@ public class CheckpointPagesWriter implements Runnable {
*/
private final List<PersistentPageMemory> pageMemoryList;
- /** Updated partitions -> count of written pages. */
- private final ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions;
+ /** Updated partitions -> write statistics. */
+ private final ConcurrentMap<GroupPartitionId, PartitionWriteStats>
updatedPartitions;
/** Future which should be finished when all pages would be written. */
private final CompletableFuture<?> doneFut;
@@ -143,7 +143,7 @@ public class CheckpointPagesWriter implements Runnable {
CheckpointMetricsTracker tracker,
IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> dirtyPartitionQueue,
List<PersistentPageMemory> pageMemoryList,
- ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
+ ConcurrentMap<GroupPartitionId, PartitionWriteStats>
updatedPartitions,
CompletableFuture<?> doneFut,
Runnable updateHeartbeat,
ThreadLocal<ByteBuffer> threadBuf,
@@ -366,7 +366,7 @@ public class CheckpointPagesWriter implements Runnable {
// We deliberately avoid "computeIfAbsent" here for the sake of
performance.
// For the overwhelming amount of calls "partitionId" should already
be in the set.
if (!updatedPartitions.containsKey(partitionId)) {
- updatedPartitions.putIfAbsent(partitionId, new LongAdder());
+ updatedPartitions.putIfAbsent(partitionId, new
PartitionWriteStats());
}
}
@@ -408,12 +408,32 @@ public class CheckpointPagesWriter implements Runnable {
checkpointProgress.writtenPagesCounter().incrementAndGet();
- pageWriter.write(pageMemory, fullPageId, buf);
+ PageWriteTarget target = pageWriter.write(pageMemory, fullPageId,
buf);
-
updatedPartitions.get(GroupPartitionId.convert(fullPageId)).increment();
+ recordPageWrite(target, GroupPartitionId.convert(fullPageId));
};
}
+ /**
+ * Records a page write to the appropriate file based on the write target.
+ *
+ * @param target The target file where the page was written.
+ * @param partitionId Partition ID.
+ */
+ private void recordPageWrite(PageWriteTarget target, GroupPartitionId
partitionId) {
+ if (target == PageWriteTarget.NONE) {
+ return;
+ }
+
+ PartitionWriteStats writeStats = updatedPartitions.get(partitionId);
+
+ writeStats.recordWrite();
+
+ if (target == PageWriteTarget.MAIN_FILE) {
+ writeStats.recordMainFileWrite();
+ }
+ }
+
private void writePartitionMeta(
PersistentPageMemory pageMemory,
GroupPartitionId partitionId,
@@ -436,11 +456,11 @@ public class CheckpointPagesWriter implements Runnable {
partitionMeta.partitionGeneration()
);
- pageWriter.write(pageMemory, fullPageId, buffer.rewind());
+ PageWriteTarget target = pageWriter.write(pageMemory, fullPageId,
buffer.rewind());
checkpointProgress.writtenPagesCounter().incrementAndGet();
- updatedPartitions.get(partitionId).increment();
+ recordPageWrite(target, partitionId);
updateHeartbeat.run();
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index d003ab8ff87..85247607492 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -22,7 +22,6 @@ import java.nio.ByteOrder;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
@@ -99,7 +98,7 @@ public class CheckpointPagesWriterFactory {
CheckpointMetricsTracker tracker,
IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> dirtyPartitionQueue,
List<PersistentPageMemory> pageMemoryList,
- ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
+ ConcurrentMap<GroupPartitionId, PartitionWriteStats>
updatedPartitions,
CompletableFuture<?> doneWriteFut,
Runnable updateHeartbeat,
CheckpointProgressImpl checkpointProgress,
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index f2c86d570ab..c3d4aaaea30 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -44,7 +44,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.components.LogSyncer;
@@ -497,7 +496,7 @@ public class Checkpointer extends IgniteWorker {
int checkpointWritePageThreads = pageWritePool == null ? 1 :
pageWritePool.getMaximumPoolSize();
// Updated partitions.
- ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions = new
ConcurrentHashMap<>();
+ ConcurrentMap<GroupPartitionId, PartitionWriteStats> updatedPartitions
= new ConcurrentHashMap<>();
CompletableFuture<?>[] futures = new
CompletableFuture[checkpointWritePageThreads];
@@ -577,13 +576,13 @@ public class Checkpointer extends IgniteWorker {
}
private void syncUpdatedPageStores(
- ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
+ ConcurrentMap<GroupPartitionId, PartitionWriteStats>
updatedPartitions,
CheckpointProgressImpl currentCheckpointProgress
) throws IgniteInternalCheckedException {
ThreadPoolExecutor pageWritePool = checkpointWritePagesPool;
if (pageWritePool == null) {
- for (Map.Entry<GroupPartitionId, LongAdder> entry :
updatedPartitions.entrySet()) {
+ for (Map.Entry<GroupPartitionId, PartitionWriteStats> entry :
updatedPartitions.entrySet()) {
if (shutdownNow) {
return;
}
@@ -599,13 +598,13 @@ public class Checkpointer extends IgniteWorker {
futures[i] = new CompletableFuture<>();
}
- BlockingQueue<Entry<GroupPartitionId, LongAdder>> queue = new
LinkedBlockingQueue<>(updatedPartitions.entrySet());
+ BlockingQueue<Entry<GroupPartitionId, PartitionWriteStats>> queue
= new LinkedBlockingQueue<>(updatedPartitions.entrySet());
for (int i = 0; i < checkpointThreads; i++) {
int threadIdx = i;
pageWritePool.execute(() -> {
- Map.Entry<GroupPartitionId, LongAdder> entry =
queue.poll();
+ Map.Entry<GroupPartitionId, PartitionWriteStats> entry =
queue.poll();
try {
while (entry != null) {
@@ -638,7 +637,7 @@ public class Checkpointer extends IgniteWorker {
private void fsyncPartitionFiles(
CheckpointProgressImpl currentCheckpointProgress,
GroupPartitionId partitionId,
- LongAdder pagesWritten
+ PartitionWriteStats writeStats
) throws IgniteInternalCheckedException {
FilePageStore filePageStore =
filePageStoreManager.getStore(partitionId);
@@ -658,15 +657,19 @@ public class Checkpointer extends IgniteWorker {
return;
}
+ // Always sync delta file (delta file always receives writes in
the current system).
fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore,
currentCheckpointProgress);
- fsyncFilePageStoreOnCheckpointThread(filePageStore,
currentCheckpointProgress);
+ // Conditionally sync main file only if it received writes.
+ if (writeStats.hasMainFileWrites()) {
+ fsyncFilePageStoreOnCheckpointThread(filePageStore,
currentCheckpointProgress);
+ }
renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
filePageStore.checkpointedPageCount(meta.metaSnapshot(currentCheckpointProgress.id()).pageCount());
-
currentCheckpointProgress.syncedPagesCounter().addAndGet(pagesWritten.intValue());
+
currentCheckpointProgress.syncedPagesCounter().addAndGet(writeStats.getTotalWrites());
} finally {
partitionDestructionLock.unlock();
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/PartitionWriteStats.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/PartitionWriteStats.java
new file mode 100644
index 00000000000..c6764385486
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/PartitionWriteStats.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Tracks write statistics for a partition during checkpoint, including total
writes
+ * and whether the main file received writes. Delta file writes are always
assumed
+ * to occur in the current system.
+ *
+ * <p>This class is thread-safe for use by multiple checkpoint writer threads.
+ */
+class PartitionWriteStats {
+ /** Total number of writes to this partition. */
+ private final LongAdder totalWrites = new LongAdder();
+
+ /** Whether the main file received any writes. */
+ private final AtomicBoolean hasMainFileWrites = new AtomicBoolean(false);
+
+ /**
+ * Records a page write.
+ */
+ void recordWrite() {
+ totalWrites.increment();
+ }
+
+ /**
+ * Records that the main file received a write.
+ */
+ void recordMainFileWrite() {
+ hasMainFileWrites.set(true);
+ }
+
+ /**
+ * Returns the total number of writes to this partition.
+ */
+ int getTotalWrites() {
+ return totalWrites.intValue();
+ }
+
+ /**
+ * Returns whether the main file received any writes.
+ */
+ boolean hasMainFileWrites() {
+ return hasMainFileWrites.get();
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
index dcda2102c47..ffa7788be67 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageWrite.java
@@ -146,6 +146,7 @@ public class DelayedDirtyPageWrite {
partitionDestructionLock.lock();
try {
+ // Return value not needed for page replacement writes
flushDirtyPage.write(pageMemory, fullPageId,
byteBufThreadLoc.get());
} catch (Throwable t) {
errorOnWrite = t;
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
index ec4639c6641..95ace81fa0a 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterTest.java
@@ -39,7 +39,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -56,13 +56,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.LongAdder;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.TestPageIoModule.TestPageIo;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -121,7 +121,7 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
WriteDirtyPage pageWriter = createDirtyPageWriter(writtenFullPageIds);
- ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions = new
ConcurrentHashMap<>();
+ ConcurrentMap<GroupPartitionId, PartitionWriteStats> updatedPartitions
= new ConcurrentHashMap<>();
CompletableFuture<?> doneFuture = new CompletableFuture<>();
@@ -167,8 +167,8 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
assertThat(updatedPartitions.keySet(),
containsInAnyOrder(groupPartId0, groupPartId1));
- assertThat(updatedPartitions.get(groupPartId0).sum(), equalTo(6L));
- assertThat(updatedPartitions.get(groupPartId1).sum(), equalTo(2L));
+ assertThat(updatedPartitions.get(groupPartId0).getTotalWrites(),
equalTo(6));
+ assertThat(updatedPartitions.get(groupPartId1).getTotalWrites(),
equalTo(2));
assertThat(tracker.dataPagesWritten(), equalTo(4));
assertThat(progressImpl.writtenPagesCounter().get(), equalTo(8));
@@ -272,7 +272,7 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
GroupPartitionId groupPartId = groupPartId(0, 0);
- ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions = new
ConcurrentHashMap<>();
+ ConcurrentMap<GroupPartitionId, PartitionWriteStats> updatedPartitions
= new ConcurrentHashMap<>();
CheckpointProgressImpl checkpointProgress = new
CheckpointProgressImpl(0);
checkpointProgress.pagesToWrite(checkpointDirtyPages);
@@ -365,9 +365,11 @@ public class CheckpointPagesWriterTest extends
BaseIgniteAbstractTest {
) throws Exception {
WriteDirtyPage writer = mock(WriteDirtyPage.class);
- if (fullPageIdArgumentCaptor != null) {
- doNothing().when(writer).write(any(PersistentPageMemory.class),
fullPageIdArgumentCaptor.capture(), any(ByteBuffer.class));
- }
+ doReturn(PageWriteTarget.MAIN_FILE).when(writer).write(
+ any(),
+ fullPageIdArgumentCaptor != null ?
fullPageIdArgumentCaptor.capture() : any(),
+ any()
+ );
return writer;
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 3bbfbbbabc0..fc997c6ed1e 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -567,9 +568,12 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
private static CheckpointPagesWriterFactory
createCheckpointPagesWriterFactory(
PartitionMetaManager partitionMetaManager,
PartitionDestructionLockManager partitionDestructionLockManager
- ) {
+ ) throws Exception {
+ WriteDirtyPage writeDirtyPage = mock(WriteDirtyPage.class);
+ when(writeDirtyPage.write(any(), any(),
any())).thenReturn(PageWriteTarget.MAIN_FILE);
+
return new CheckpointPagesWriterFactory(
- mock(WriteDirtyPage.class),
+ writeDirtyPage,
ioRegistry,
partitionMetaManager,
PAGE_SIZE,
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
index 222387033cc..60db4a7de08 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/throttling/PageMemoryThrottlingTest.java
@@ -61,6 +61,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import
org.apache.ignite.internal.pagememory.persistence.FakePartitionMeta.FakePartitionMetaFactory;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
@@ -170,13 +171,15 @@ public class PageMemoryThrottlingTest extends
IgniteAbstractTest {
CHECKPOINT_BUFFER_SIZE,
pageStoreManager,
(pageMem, pageId, pageBuf) -> {
- checkpointManager.writePageToFilePageStore(pageMem,
pageId, pageBuf);
+ PageWriteTarget target =
checkpointManager.writePageToFilePageStore(pageMem, pageId, pageBuf);
// Almost the same code that happens in data region, but
here the region is mocked.
CheckpointProgress checkpointProgress =
checkpointManager.currentCheckpointProgress();
assertNotNull(checkpointProgress);
checkpointProgress.evictedPagesCounter().incrementAndGet();
+
+ return target;
},
checkpointManager.checkpointTimeoutLock(),
new OffheapReadWriteLock(2),
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index 617e8bfb9ac..2e496eb9004 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionC
import org.apache.ignite.internal.pagememory.configuration.ReplacementMode;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
@@ -338,16 +339,18 @@ public class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPage
}
}
- private void flushDirtyPageOnReplacement(
+ private PageWriteTarget flushDirtyPageOnReplacement(
PersistentPageMemory pageMemory, FullPageId fullPageId, ByteBuffer
byteBuffer
) throws IgniteInternalCheckedException {
- checkpointManager.writePageToFilePageStore(pageMemory, fullPageId,
byteBuffer);
+ PageWriteTarget target =
checkpointManager.writePageToFilePageStore(pageMemory, fullPageId, byteBuffer);
CheckpointProgress checkpointProgress =
checkpointManager.currentCheckpointProgress();
if (checkpointProgress != null) {
checkpointProgress.evictedPagesCounter().incrementAndGet();
}
+
+ return target;
}
/** {@inheritDoc} */
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
index 61b5ffe6e59..c11a1b0f2fd 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
@@ -73,6 +73,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.DirtyFullPageId;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
+import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
import
org.apache.ignite.internal.pagememory.persistence.PartitionDestructionLockManager;
import
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.PartitionMetaSnapshot;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
@@ -376,7 +377,10 @@ public class PersistentPageMemoryNoLoadTest extends
AbstractPageMemoryNoLoadSelf
defaultCheckpointBufferSize(),
filePageStoreManager,
checkpointManager,
- (pageMemory0, fullPageId, buffer) ->
flushDirtyPageForReplacementFuture.complete(null)
+ (pageMemory0, fullPageId, buffer) -> {
+ flushDirtyPageForReplacementFuture.complete(null);
+ return PageWriteTarget.NONE;
+ }
);
dataRegions.add(new TestDataRegion<>(pageMemory));