This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 77af4770471 IGNITE-24546 Port target ratio based throttling from
Apache Ignite 2 (#5289)
77af4770471 is described below
commit 77af4770471a5d35e5662ebd8119cc9b77b61055
Author: Ivan Bessonov <[email protected]>
AuthorDate: Tue Feb 25 17:04:36 2025 +0300
IGNITE-24546 Port target ratio based throttling from Apache Ignite 2 (#5289)
---
.../ItBplusTreePersistentPageMemoryTest.java | 1 +
...BplusTreeReuseListPersistentPageMemoryTest.java | 1 +
.../persistence/PersistentPageMemory.java | 115 +++++++++----
.../checkpoint/CheckpointDirtyPages.java | 2 +-
.../persistence/checkpoint/CheckpointManager.java | 4 +
.../persistence/checkpoint/CheckpointProgress.java | 5 +
.../checkpoint/CheckpointProgressImpl.java | 5 +
.../persistence/checkpoint/Checkpointer.java | 4 +
.../CheckpointBufferOverflowWatchdog.java | 52 ++++++
.../throttling/CheckpointLockStateChecker.java | 29 ++++
.../persistence/throttling/ExponentialBackoff.java | 74 +++++++++
.../ExponentialBackoffThrottlingStrategy.java | 56 +++++++
.../persistence/throttling/PagesWriteThrottle.java | 182 +++++++++++++++++++++
.../throttling/PagesWriteThrottlePolicy.java | 97 +++++++++++
.../replacement/AbstractPageReplacementTest.java | 1 +
.../pagememory/PersistentPageMemoryDataRegion.java | 5 +
.../pagememory/PersistentPageMemoryNoLoadTest.java | 1 +
17 files changed, 598 insertions(+), 36 deletions(-)
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
index 1fe995ab7e0..1ef0ba65492 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreePersistentPageMemoryTest.java
@@ -88,6 +88,7 @@ public class ItBplusTreePersistentPageMemoryTest extends
AbstractBplusTreePageMe
(fullPageId, buf, tag) -> {
},
mockCheckpointTimeoutLock(true),
+ () -> null,
PAGE_SIZE,
offheapReadWriteLock
);
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
index 995775713c0..3c0295da134 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/persistence/ItBplusTreeReuseListPersistentPageMemoryTest.java
@@ -71,6 +71,7 @@ public class ItBplusTreeReuseListPersistentPageMemoryTest
extends AbstractBplusT
(fullPageId, buf, tag) -> {
},
mockCheckpointTimeoutLock(true),
+ () -> null,
PAGE_SIZE,
new OffheapReadWriteLock(128)
);
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 65751506721..8fc66ddcd3d 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -37,6 +37,7 @@ import static
org.apache.ignite.internal.pagememory.persistence.PageHeader.readP
import static
org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
import static
org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
import static
org.apache.ignite.internal.pagememory.persistence.PagePool.SEGMENT_INDEX_MASK;
+import static
org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD;
import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
@@ -45,11 +46,11 @@ import static
org.apache.ignite.internal.util.ArrayUtils.concat;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
-import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
-import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
+import static org.apache.ignite.internal.util.GridUnsafe.decrementAndGetInt;
import static org.apache.ignite.internal.util.GridUnsafe.getInt;
import static org.apache.ignite.internal.util.GridUnsafe.getLong;
+import static org.apache.ignite.internal.util.GridUnsafe.incrementAndGetInt;
import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
import static org.apache.ignite.internal.util.GridUnsafe.zeroMemory;
@@ -75,6 +76,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -101,6 +103,7 @@ import
org.apache.ignite.internal.pagememory.persistence.replacement.PageReplace
import
org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicyFactory;
import
org.apache.ignite.internal.pagememory.persistence.replacement.RandomLruPageReplacementPolicyFactory;
import
org.apache.ignite.internal.pagememory.persistence.replacement.SegmentedLruPageReplacementPolicyFactory;
+import
org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.OffheapReadWriteLock;
import org.jetbrains.annotations.Nullable;
@@ -155,18 +158,15 @@ public class PersistentPageMemory implements PageMemory {
/** Try again tag. */
public static final int TRY_AGAIN_TAG = -1;
- /**
- * Threshold of the checkpoint buffer. We should start forcefully
checkpointing its pages upon exceeding it. The value of {@code 2/3} is
- * ported from {@code Ignite 2.x}.
- */
- private static final float CP_BUF_FILL_THRESHOLD = 2.0f / 3;
-
/** Data region configuration view. */
private final PersistentPageMemoryProfileView storageProfileView;
/** Page IO registry. */
private final PageIoRegistry ioRegistry;
+ /** The supplier of checkpoint progress. */
+ private final Supplier<@Nullable CheckpointProgress>
checkpointProgressSupplier;
+
/** Page manager. */
private final PageReadWriteManager pageStoreManager;
@@ -180,8 +180,7 @@ public class PersistentPageMemory implements PageMemory {
private final DirectMemoryProvider directMemoryProvider;
/** Segments array, {@code null} if not {@link #start() started}. */
- @Nullable
- private volatile Segment[] segments;
+ private volatile Segment @Nullable [] segments;
/** Lock for segments changes. */
private final Object segmentsLock = new Object();
@@ -213,6 +212,9 @@ public class PersistentPageMemory implements PageMemory {
/** Checkpoint page pool, {@code null} if not {@link #start() started}. */
private volatile @Nullable PagePool checkpointPool;
+ /** Pages write throttle. */
+ private final @Nullable PagesWriteThrottlePolicy writeThrottle;
+
/**
* Delayed page replacement (rotation with disk) tracker. Because other
thread may require exactly the same page to be loaded from
* store, reads are protected by locking.
@@ -233,6 +235,7 @@ public class PersistentPageMemory implements PageMemory {
* @param changeTracker Callback invoked to track changes in pages.
* @param flushDirtyPageForReplacement Write callback invoked when a dirty
page is removed for replacement.
* @param checkpointTimeoutLock Checkpoint timeout lock.
+ * @param checkpointProgressSupplier The supplier of checkpoint progress.
* @param pageSize Page size in bytes.
* @param rwLock Read-write lock for pages.
*/
@@ -245,12 +248,14 @@ public class PersistentPageMemory implements PageMemory {
@Nullable PageChangeTracker changeTracker,
WriteDirtyPage flushDirtyPageForReplacement,
CheckpointTimeoutLock checkpointTimeoutLock,
+ Supplier<@Nullable CheckpointProgress> checkpointProgressSupplier,
// TODO: IGNITE-17017 Move to common config
int pageSize,
OffheapReadWriteLock rwLock
) {
this.storageProfileView = (PersistentPageMemoryProfileView)
storageProfileConfiguration.value();
this.ioRegistry = ioRegistry;
+ this.checkpointProgressSupplier = checkpointProgressSupplier;
this.sizes = concat(segmentSizes, checkpointBufferSize);
this.pageStoreManager = pageStoreManager;
this.changeTracker = changeTracker;
@@ -282,6 +287,15 @@ public class PersistentPageMemory implements PageMemory {
}
delayedPageReplacementTracker = new
DelayedPageReplacementTracker(pageSize, flushDirtyPageForReplacement, LOG,
sizes.length - 1);
+
+ // TODO IGNITE-24548 Use this initialization code.
+ // new PagesWriteThrottle(
+ // this,
+ // checkpointProgressSupplier,
+ // checkpointTimeoutLock::checkpointLockIsHeldByThread,
+ // false
+ // );
+ writeThrottle = null;
}
/** {@inheritDoc} */
@@ -519,6 +533,10 @@ public class PersistentPageMemory implements PageMemory {
assert started;
assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
+ if (writeThrottle != null) {
+ writeThrottle.onMarkDirty(false);
+ }
+
long pageId = pageStoreManager.allocatePage(grpId, partId, flags);
// We need to allocate page in memory for marking it dirty to save it
in the next checkpoint.
@@ -1183,6 +1201,10 @@ public class PersistentPageMemory implements PageMemory {
assert getVersion(page + PAGE_OVERHEAD) != 0 :
dumpPage(pageId, fullId.groupId());
assert getType(page + PAGE_OVERHEAD) != 0 : hexLong(pageId);
+
+ if (writeThrottle != null && !restore && !wasDirty &&
markDirty) {
+ writeThrottle.onMarkDirty(isInCheckpoint(fullId));
+ }
} catch (AssertionError ex) {
LOG.debug("Failed to unlock page [fullPageId={}, binPage={}]",
fullId, toHexString(page, systemPageSize()));
@@ -1459,13 +1481,13 @@ public class PersistentPageMemory implements PageMemory
{
private void acquirePage(long absPtr) {
PageHeader.acquirePage(absPtr);
- updateAtomicInt(acquiredPagesPtr, 1);
+ incrementAndGetInt(acquiredPagesPtr);
}
private void releasePage(long absPtr) {
PageHeader.releasePage(absPtr);
- updateAtomicInt(acquiredPagesPtr, -1);
+ decrementAndGetInt(acquiredPagesPtr);
}
/**
@@ -1759,29 +1781,15 @@ public class PersistentPageMemory implements PageMemory
{
public CheckpointPages checkpointPages() {
return checkpointPages;
}
- }
-
- private static int updateAtomicInt(long ptr, int delta) {
- while (true) {
- int old = getInt(ptr);
-
- int updated = old + delta;
-
- if (compareAndSwapInt(null, ptr, old, updated)) {
- return updated;
- }
- }
- }
- private static long updateAtomicLong(long ptr, long delta) {
- while (true) {
- long old = getLong(ptr);
-
- long updated = old + delta;
-
- if (compareAndSwapLong(null, ptr, old, updated)) {
- return updated;
- }
+ /**
+ * Checks if segment is sufficiently full.
+ *
+ * @param dirtyRatioThreshold Max allowed dirty pages ration.
+ */
+ boolean shouldThrottle(double dirtyRatioThreshold) {
+ // Comparison using multiplication is faster than comparison using
division.
+ return dirtyPagesCntr.doubleValue() > dirtyRatioThreshold *
pages();
}
}
@@ -1816,6 +1824,27 @@ public class PersistentPageMemory implements PageMemory {
return checkpointUrgency.get();
}
+ /**
+ * Checks if region is sufficiently full.
+ *
+ * @param dirtyRatioThreshold Max allowed dirty pages ration.
+ */
+ public boolean shouldThrottle(double dirtyRatioThreshold) {
+ Segment[] segments = this.segments;
+
+ if (segments == null) {
+ return false;
+ }
+
+ for (Segment segment : segments) {
+ if (segment.shouldThrottle(dirtyRatioThreshold)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/**
* Returns number of pages used in checkpoint buffer.
*/
@@ -1835,7 +1864,11 @@ public class PersistentPageMemory implements PageMemory {
}
private void releaseCheckpointBufferPage(long tmpBufPtr) {
- checkpointPool.releaseFreePage(tmpBufPtr);
+ int resultCounter = checkpointPool.releaseFreePage(tmpBufPtr);
+
+ if (writeThrottle != null && resultCounter == checkpointPool.pages() /
2) {
+ writeThrottle.wakeupThrottledThreads();
+ }
}
/**
@@ -2124,6 +2157,10 @@ public class PersistentPageMemory implements PageMemory {
checkpointUrgency.set(NOT_REQUIRED);
+ if (writeThrottle != null) {
+ writeThrottle.onBeginCheckpoint();
+ }
+
return CollectionUtils.concat(dirtyPageIds);
}
@@ -2140,12 +2177,20 @@ public class PersistentPageMemory implements PageMemory
{
seg.checkpointPages = null;
}
}
+
+ if (writeThrottle != null) {
+ writeThrottle.onFinishCheckpoint();
+ }
}
/**
* Checks if the Checkpoint Buffer is currently close to exhaustion.
*/
public boolean isCpBufferOverflowThresholdExceeded() {
+ if (writeThrottle != null) {
+ return writeThrottle.isCpBufferOverflowThresholdExceeded();
+ }
+
assert started;
PagePool checkpointPool = this.checkpointPool;
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
index 86086ffa8f6..6341aecec15 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java
@@ -32,7 +32,7 @@ import
org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
import org.jetbrains.annotations.Nullable;
/** Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and partition IDs that should be checkpointed. */
-class CheckpointDirtyPages {
+public class CheckpointDirtyPages {
/** Dirty page ID comparator by groupId -> partitionId -> pageIdx. */
static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
.comparingInt(FullPageId::groupId)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 4d85b720874..d80b4355cfc 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -243,6 +243,10 @@ public class CheckpointManager {
return checkpointer.scheduleCheckpoint(delayMillis, reason);
}
+ public @Nullable CheckpointProgress currentCheckpointProgress() {
+ return checkpointer.currentCheckpointProgress();
+ }
+
/**
* Returns the progress of the last checkpoint, or the current checkpoint
if in progress, {@code null} if no checkpoint has occurred.
*/
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
index 0d4a5f754d0..209978b5209 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
@@ -55,4 +55,9 @@ public interface CheckpointProgress {
* written.
*/
@Nullable CheckpointDirtyPages pagesToWrite();
+
+ /**
+ * Returns a number of written checkpoint pages.
+ */
+ int writtenPages();
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
index f8a5c5abc9a..cbcbbeca38a 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -145,6 +145,11 @@ class CheckpointProgressImpl implements CheckpointProgress
{
return writtenPagesCntr;
}
+ @Override
+ public int writtenPages() {
+ return writtenPagesCntr.get();
+ }
+
/**
* Returns counter for fsynced checkpoint pages.
*/
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 3cac430c33a..5abce1812cf 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -772,6 +772,10 @@ public class Checkpointer extends IgniteWorker {
}
}
+ @Nullable CheckpointProgress currentCheckpointProgress() {
+ return currentCheckpointProgress;
+ }
+
/**
* Returns the progress of the last checkpoint, or the current checkpoint
if in progress, {@code null} if no checkpoint has occurred.
*/
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointBufferOverflowWatchdog.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointBufferOverflowWatchdog.java
new file mode 100644
index 00000000000..db05001a6f9
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointBufferOverflowWatchdog.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import static
org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy.CP_BUF_FILL_THRESHOLD;
+
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+
+/**
+ * Logic used to determine whether Checkpoint Buffer is in danger zone and
writer threads should be throttled.
+ */
+class CheckpointBufferOverflowWatchdog {
+ /** Page memory. */
+ private final PersistentPageMemory pageMemory;
+
+ /**
+ * Creates a new instance.
+ *
+ * @param pageMemory Page memory to use.
+ */
+ CheckpointBufferOverflowWatchdog(PersistentPageMemory pageMemory) {
+ this.pageMemory = pageMemory;
+ }
+
+ /**
+ * Returns true if Checkpoint Buffer is in danger zone (more than
+ * {@link PagesWriteThrottlePolicy#CP_BUF_FILL_THRESHOLD} of the buffer is
filled) and, hence, writer threads need
+ * to be throttled.
+ *
+ * @return {@code true} iff Checkpoint Buffer is in danger zone.
+ */
+ boolean isInDangerZone() {
+ int checkpointBufLimit = (int) (pageMemory.maxCheckpointBufferPages()
* CP_BUF_FILL_THRESHOLD);
+
+ return pageMemory.usedCheckpointBufferPages() > checkpointBufLimit;
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointLockStateChecker.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointLockStateChecker.java
new file mode 100644
index 00000000000..41078fbf1a5
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/CheckpointLockStateChecker.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+/**
+ * Interface for perform checking that checkpoint lock is held by current
track.
+ */
+@FunctionalInterface
+public interface CheckpointLockStateChecker {
+ /**
+ * Returns {@code true} if checkpoint lock is held by current thread.
+ */
+ boolean checkpointLockIsHeldByThread();
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java
new file mode 100644
index 00000000000..bb710c2f549
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoff.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements exponential backoff logic. Contains a counter and increments it
on each {@link #nextDuration()}.
+ * May be reset using {@link #reset()}.
+ */
+class ExponentialBackoff {
+ /**
+ * Starting backoff duration.
+ */
+ private final long startingBackoffNanos;
+
+ /**
+ * Backoff ratio. Each next duration will be this times longer.
+ */
+ private final double backoffRatio;
+
+ /**
+ * Exponential backoff counter.
+ */
+ private final AtomicInteger exponentialBackoffCounter = new
AtomicInteger(0);
+
+ /**
+ * Constructs a new instance with the given parameters.
+ *
+ * @param startingBackoffNanos Duration of first backoff in nanoseconds.
+ * @param backoffRatio Each next duration will be this times longer.
+ */
+ public ExponentialBackoff(long startingBackoffNanos, double backoffRatio) {
+ this.startingBackoffNanos = startingBackoffNanos;
+ this.backoffRatio = backoffRatio;
+ }
+
+ /**
+ * Returns next backoff duration (in nanoseconds). As a side effect,
increments the backoff counter so that
+ * next call will return a longer duration.
+ *
+ * @return Next backoff duration in nanoseconds.
+ */
+ public long nextDuration() {
+ int exponent = exponentialBackoffCounter.getAndIncrement();
+ return (long) (startingBackoffNanos * Math.pow(backoffRatio,
exponent));
+ }
+
+ /**
+ * Resets the exponential backoff counter so that next call to {@link
#nextDuration()}
+ * will return {@link #startingBackoffNanos}.
+ *
+ * @return {@code true} iff this backoff was not already in a reset state.
+ */
+ public boolean reset() {
+ int oldValue = exponentialBackoffCounter.getAndSet(0);
+ return oldValue != 0;
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoffThrottlingStrategy.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoffThrottlingStrategy.java
new file mode 100644
index 00000000000..e67d84d27f3
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/ExponentialBackoffThrottlingStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+/**
+ * Logic used to protect memory (mainly, Checkpoint Buffer) from exhaustion
using exponential backoff.
+ */
+class ExponentialBackoffThrottlingStrategy {
+ /**
+ * Starting throttle time. Limits write speed to 1000 MB/s.
+ */
+ private static final long STARTING_THROTTLE_NANOS = 4000;
+
+ /**
+ * Backoff ratio. Each next park will be this times longer.
+ */
+ private static final double BACKOFF_RATIO = 1.05;
+
+ /**
+ * Exponential backoff used to throttle threads.
+ */
+ private final ExponentialBackoff backoff = new
ExponentialBackoff(STARTING_THROTTLE_NANOS, BACKOFF_RATIO);
+
+ /**
+ * Computes next duration (in nanos) to throttle a thread to protect
Checkpoint Buffer.
+ *
+ * @return park time in nanos
+ */
+ long protectionParkTime() {
+ return backoff.nextDuration();
+ }
+
+ /**
+ * Resets the backoff counter. Invoked when no throttling is needed
anymore.
+ *
+ * @return {@code true} iff the backoff was not already in a reset state
+ */
+ boolean resetBackoff() {
+ return backoff.reset();
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java
new file mode 100644
index 00000000000..ba35871aefa
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottle.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is
overflowed.
+ */
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
+ /** Logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(PagesWriteThrottle.class);
+
+ /** Page memory. */
+ private final PersistentPageMemory pageMemory;
+
+ /** Database manager. */
+ private final Supplier<CheckpointProgress> cpProgress;
+
+ /** If true, throttle will only protect from checkpoint buffer overflow,
not from dirty pages ratio cap excess. */
+ private final boolean throttleOnlyPagesInCheckpoint;
+
+ /** Checkpoint lock state checker. */
+ private final CheckpointLockStateChecker stateChecker;
+
+ /** In-checkpoint protection logic. */
+ private final ExponentialBackoffThrottlingStrategy inCheckpointProtection
+ = new ExponentialBackoffThrottlingStrategy();
+
+ /** Not-in-checkpoint protection logic. */
+ private final ExponentialBackoffThrottlingStrategy
notInCheckpointProtection
+ = new ExponentialBackoffThrottlingStrategy();
+
+ /** Checkpoint Buffer-related logic used to keep it safe. */
+ private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;
+
+ /** Threads that are throttled due to checkpoint buffer overflow. */
+ private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new
ConcurrentHashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param pageMemory Page memory.
+ * @param cpProgress Database manager.
+ * @param stateChecker checkpoint lock state checker.
+ * @param throttleOnlyPagesInCheckpoint If {@code true}, throttle will
only protect from checkpoint buffer overflow.
+ */
+ public PagesWriteThrottle(
+ PersistentPageMemory pageMemory,
+ Supplier<CheckpointProgress> cpProgress,
+ CheckpointLockStateChecker stateChecker,
+ boolean throttleOnlyPagesInCheckpoint
+ ) {
+ this.pageMemory = pageMemory;
+ this.cpProgress = cpProgress;
+ this.stateChecker = stateChecker;
+ this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+ cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
+
+ assert throttleOnlyPagesInCheckpoint || cpProgress != null
+ : "cpProgress must be not null if ratio based throttling mode
is used";
+ }
+
+ @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+ assert stateChecker.checkpointLockIsHeldByThread();
+
+ boolean shouldThrottle = false;
+
+ if (isPageInCheckpoint) {
+ shouldThrottle = isCpBufferOverflowThresholdExceeded();
+ }
+
+ if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
+ CheckpointProgress progress = cpProgress.get();
+
+ if (progress == null) {
+ return; // Don't throttle if checkpoint is not running.
+ }
+
+ int cpWrittenPages = progress.writtenPages();
+
+ int cpTotalPages = progress.currentCheckpointPagesCount();
+
+ if (cpWrittenPages == cpTotalPages) {
+ // Checkpoint is already in fsync stage, increasing maximum
ratio of dirty pages to 3/4.
+ shouldThrottle = pageMemory.shouldThrottle(0.75);
+ } else {
+ double dirtyRatioThreshold = ((double) cpWrittenPages) /
cpTotalPages;
+
+ // Starting with 0.05 to avoid throttle right after checkpoint
start.
+ // 7/12 is maximum ratio of dirty pages.
+ dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7
/ 12;
+
+ shouldThrottle =
pageMemory.shouldThrottle(dirtyRatioThreshold);
+ }
+ }
+
+ ExponentialBackoffThrottlingStrategy exponentialThrottle =
isPageInCheckpoint
+ ? inCheckpointProtection : notInCheckpointProtection;
+
+ if (shouldThrottle) {
+ long throttleParkTimeNs = exponentialThrottle.protectionParkTime();
+
+ Thread curThread = Thread.currentThread();
+
+ if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+ LOG.warn("Parking thread=" + curThread.getName()
+ + " for timeout(ms)=" +
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+ }
+
+ if (isPageInCheckpoint) {
+ cpBufThrottledThreads.put(curThread.getId(), curThread);
+
+ try {
+ LockSupport.parkNanos(throttleParkTimeNs);
+ } finally {
+ cpBufThrottledThreads.remove(curThread.getId());
+
+ if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+ LOG.warn("Unparking thread=" + curThread.getName()
+ + " with park timeout(ms)=" +
TimeUnit.NANOSECONDS.toMillis(throttleParkTimeNs));
+ }
+ }
+ } else {
+ LockSupport.parkNanos(throttleParkTimeNs);
+ }
+ } else {
+ boolean backoffWasAlreadyStarted =
exponentialThrottle.resetBackoff();
+
+ if (isPageInCheckpoint && backoffWasAlreadyStarted) {
+ unparkParkedThreads();
+ }
+ }
+ }
+
+ @Override public void wakeupThrottledThreads() {
+ if (!isCpBufferOverflowThresholdExceeded()) {
+ inCheckpointProtection.resetBackoff();
+
+ unparkParkedThreads();
+ }
+ }
+
+ private void unparkParkedThreads() {
+ cpBufThrottledThreads.values().forEach(LockSupport::unpark);
+ }
+
+ @Override public void onBeginCheckpoint() {
+ }
+
+ @Override public void onFinishCheckpoint() {
+ inCheckpointProtection.resetBackoff();
+ notInCheckpointProtection.resetBackoff();
+ }
+
+ @Override public boolean isCpBufferOverflowThresholdExceeded() {
+ return cpBufferWatchdog.isInDangerZone();
+ }
+}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottlePolicy.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottlePolicy.java
new file mode 100644
index 00000000000..201ab85ad99
--- /dev/null
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/throttling/PagesWriteThrottlePolicy.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.throttling;
+
+/**
+ * Throttling policy, encapsulates logic of delaying write operations.
+ *
+ * <p>There are two resources that get (or might get) consumed when writing:
+ * <ul>
+ * <li>
+ * <b>Checkpoint Buffer</b> where a page is placed when, being under
checkpoint, it gets written
+ * </li>
+ * <li>
+ * <b>Clean pages</b> which get dirtied when writes occur
+ * </li>
+ * </ul>
+ * Both resources are limited in size. Both are freed when checkpoint
finishes. This means that, if writers
+ * write too fast, they can consume any of these two resources before we have
a chance to finish a checkpoint.
+ * If this happens, the cluster fails or stalls.
+ *
+ * <p>Write throttling solves this problem by slowing down the writers to a
rate at which they do not exhaust
+ * any of the two resources.
+ *
+ * <p>An alternative to just slowing down is to wait in a loop till the
resource we're after gets freed, and
+ * only then allow the write to happen. The problem with this approach is that
we cannot wait in a loop/sleep
+ * under a write lock, so the logic would be a lot more complicated. Maybe in
the future we'll follow this path,
+ * but for now, a simpler approach of just throttling is used (see below).
+ *
+ * <p>If we just slow writers down by throttling their writes, AND we have
enough Checkpoint Buffer and pages in
+ * segments to take some load bursts, we are fine. Under such assumptions, it
does not matter whether we throttle
+ * a writer thread before acquiring write lock or after it gets released; in
the current implementation, this
+ * happens after write lock gets released (because it was considered simpler
to implement).
+ *
+ * <p>The actual throttling happens when a page gets marked dirty by calling
{@link #onMarkDirty(boolean)}.
+ *
+ * <p>There are two additional methods for interfacing with other parts of the
system:
+ * <ul>
+ * <li>{@link #wakeupThrottledThreads()} which wakes up the threads
currently being throttled; in the current
+ * implementation, it is called when Checkpoint Buffer utilization falls
below 1/2.</li>
+ * <li>{@link #isCpBufferOverflowThresholdExceeded()} which is called by a
checkpointer to see whether the Checkpoint Buffer is
+ * in a danger zone and, if yes, it starts to prioritize writing pages
from the Checkpoint Buffer over
+ * pages from the normal checkpoint sequence.</li>
+ * </ul>
+ */
+public interface PagesWriteThrottlePolicy {
+ // TODO Maybe make it configurable in IGNITE-24548
+ /** Min park time which triggers logging. */
+ long LOGGING_THRESHOLD = 10;
+
+ /** Checkpoint buffer fullfill upper bound. */
+ float CP_BUF_FILL_THRESHOLD = 2f / 3;
+
+ /**
+ * Callback to apply throttling delay.
+ *
+ * @param isPageInCheckpoint Flag indicating if current page is in scope
of current checkpoint.
+ */
+ void onMarkDirty(boolean isPageInCheckpoint);
+
+ /**
+ * Callback to wakeup throttled threads. Invoked when the Checkpoint
Buffer use drops below a certain
+ * threshold.
+ */
+ void wakeupThrottledThreads();
+
+ /**
+ * Callback to notify throttling policy checkpoint was started.
+ */
+ void onBeginCheckpoint();
+
+ /**
+ * Callback to notify throttling policy checkpoint was finished.
+ */
+ void onFinishCheckpoint();
+
+ /**
+ * Whether Checkpoint Buffer is currently close to exhaustion.
+ *
+ * @return {@code true} if measures like throttling to protect Checkpoint
Buffer should be applied, and {@code false} otherwise.
+ */
+ boolean isCpBufferOverflowThresholdExceeded();
+}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
index c33e57ef31a..8ec66d8574e 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
@@ -166,6 +166,7 @@ public abstract class AbstractPageReplacementTest extends
IgniteAbstractTest {
null,
(pageMemory0, fullPageId, buf) ->
checkpointManager.writePageToDeltaFilePageStore(pageMemory0, fullPageId, buf,
true),
checkpointManager.checkpointTimeoutLock(),
+ () -> null,
PAGE_SIZE,
new OffheapReadWriteLock(128)
);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index a67119f352c..a3b34445083 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -22,6 +22,7 @@ import static org.apache.ignite.internal.util.Constants.MiB;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.apache.ignite.internal.pagememory.DataRegion;
import
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
import
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileView;
@@ -29,6 +30,7 @@ import
org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.util.OffheapReadWriteLock;
@@ -101,6 +103,8 @@ class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPageMemory>
public void start() {
PersistentPageMemoryProfileView dataRegionConfigView =
(PersistentPageMemoryProfileView) cfg.value();
+ Supplier<CheckpointProgress> currentCheckpointProgress =
checkpointManager::currentCheckpointProgress;
+
PersistentPageMemory pageMemory = new PersistentPageMemory(
cfg,
ioRegistry,
@@ -110,6 +114,7 @@ class PersistentPageMemoryDataRegion implements
DataRegion<PersistentPageMemory>
null,
(pageMemory0, fullPageId, buf) ->
checkpointManager.writePageToDeltaFilePageStore(pageMemory0, fullPageId, buf,
true),
checkpointManager.checkpointTimeoutLock(),
+ currentCheckpointProgress,
pageSize,
new OffheapReadWriteLock(128)
);
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
index f2aa3ce9bdf..8a1885d1daf 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryNoLoadTest.java
@@ -466,6 +466,7 @@ public class PersistentPageMemoryNoLoadTest extends
AbstractPageMemoryNoLoadSelf
null,
flushDirtyPageForReplacement,
checkpointManager == null ? mockCheckpointTimeoutLock(true) :
checkpointManager.checkpointTimeoutLock(),
+ checkpointManager == null ? () -> null :
checkpointManager::currentCheckpointProgress,
PAGE_SIZE,
new OffheapReadWriteLock(128)
);