This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 04999c5 IGNITE-12452 Write pages from checkpoint buffer with higher
priority - Fixes #7145.
04999c5 is described below
commit 04999c518d4f22895247a7686879f2cdfdbcba4f
Author: Evgeny Stanilovskiy <[email protected]>
AuthorDate: Tue Dec 24 18:22:42 2019 +0300
IGNITE-12452 Write pages from checkpoint buffer with higher priority -
Fixes #7145.
---
.../apache/ignite/internal/pagemem/FullPageId.java | 3 +
.../GridCacheDatabaseSharedManager.java | 48 +++----
.../cache/persistence/pagemem/PageMemoryEx.java | 10 ++
.../cache/persistence/pagemem/PageMemoryImpl.java | 40 ++++++
.../cache/persistence/pagemem/PagePool.java | 4 +-
.../persistence/pagemem/PagesWriteThrottle.java | 9 +-
.../pagemem/PagesWriteThrottlePolicy.java | 7 +
.../pagemem/IgniteThrottlingUnitTest.java | 17 +--
.../persistence/pagemem/PageMemoryImplTest.java | 142 +++++++++++++++++++--
9 files changed, 227 insertions(+), 53 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
index 4c7d031..588f7c4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
@@ -49,6 +49,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
* Effective page ID is page ID with zeroed bits used for page ID rotation.
*/
public class FullPageId {
+ /** */
+ public static final FullPageId NULL_PAGE = new FullPageId(-1, -1);
+
/** Page ID. */
private final long pageId;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a89919b..33a307c 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -4788,30 +4788,15 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
List<FullPageId> pagesToRetry = writePages(writePageIds);
if (pagesToRetry.isEmpty())
- doneFut.onDone((Void)null);
+ doneFut.onDone();
else {
LT.warn(log, pagesToRetry.size() + " checkpoint pages were
not written yet due to unsuccessful " +
"page write lock acquisition and will be retried");
- if (retryWriteExecutor == null) {
- while (!pagesToRetry.isEmpty())
- pagesToRetry = writePages(pagesToRetry);
+ while (!pagesToRetry.isEmpty())
+ pagesToRetry = writePages(pagesToRetry);
- doneFut.onDone((Void)null);
- }
- else {
- // Submit current retry pages to the end of the queue
to avoid starvation.
- WriteCheckpointPages retryWritesTask = new
WriteCheckpointPages(
- tracker,
- pagesToRetry,
- updStores,
- doneFut,
- totalPagesToWrite,
- beforePageWrite,
- retryWriteExecutor);
-
- retryWriteExecutor.submit(retryWritesTask);
- }
+ doneFut.onDone();
}
}
catch (Throwable e) {
@@ -4832,16 +4817,14 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
ByteBuffer tmpWriteBuf = threadBuf.get();
+ boolean throttlingEnabled = resolveThrottlingPolicy() !=
PageMemoryImpl.ThrottlingPolicy.DISABLED;
+
for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
break;
- tmpWriteBuf.rewind();
-
beforePageWrite.run();
- snapshotMgr.beforePageWrite(fullId);
-
int grpId = fullId.groupId();
PageMemoryEx pageMem;
@@ -4862,7 +4845,26 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
pageMem = (PageMemoryEx)region.pageMemory();
}
+ snapshotMgr.beforePageWrite(fullId);
+
+ tmpWriteBuf.rewind();
+
pageMem.checkpointWritePage(fullId, tmpWriteBuf,
pageStoreWriter, tracker);
+
+ if (throttlingEnabled) {
+ while (pageMem.shouldThrottle()) {
+ FullPageId cpPageId = pageMem.pullPageFromCpBuffer();
+
+ if (cpPageId.equals(FullPageId.NULL_PAGE))
+ break;
+
+ snapshotMgr.beforePageWrite(cpPageId);
+
+ tmpWriteBuf.rewind();
+
+ pageMem.checkpointWritePage(cpPageId, tmpWriteBuf,
pageStoreWriter, tracker);
+ }
+ }
}
return pagesToRetry;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index f9fdb0d..8465783 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -165,4 +165,14 @@ public interface PageMemoryEx extends PageMemory {
* @return Future that will be completed when all pages are cleared.
*/
public IgniteInternalFuture<Void> clearAsync(LoadedPagesMap.KeyPredicate
pred, boolean cleanDirty);
+
+ /**
+ * Pull page from checkpoint buffer.
+ */
+ public FullPageId pullPageFromCpBuffer();
+
+ /**
+ * Calculates throttling condition.
+ */
+ public boolean shouldThrottle();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index b0cc7bd..2f90d41 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -95,6 +95,8 @@ import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.internal.pagemem.FullPageId.NULL_PAGE;
+import static
org.apache.ignite.internal.processors.cache.persistence.pagemem.PagePool.SEGMENT_INDEX_MASK;
import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
/**
@@ -1265,6 +1267,8 @@ public class PageMemoryImpl implements PageMemoryEx {
copyInBuffer(tmpAbsPtr, buf);
+ PageHeader.fullPageId(tmpAbsPtr, NULL_PAGE);
+
GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(),
(byte)0);
if (tracker != null)
@@ -1616,6 +1620,8 @@ public class PageMemoryImpl implements PageMemoryEx {
PageHeader.dirty(absPtr, false);
PageHeader.tempBufferPointer(absPtr, tmpRelPtr);
+ // info for checkpoint buffer cleaner.
+ PageHeader.fullPageId(tmpAbsPtr, fullId);
assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
assert PageIO.getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; //TODO
GG-11480
@@ -1847,6 +1853,40 @@ public class PageMemoryImpl implements PageMemoryEx {
return memMetrics;
}
+ /** {@inheritDoc} */
+ @Override public boolean shouldThrottle() {
+ return writeThrottle.shouldThrottle();
+ }
+
+ /**
+ * Get arbitrary page from cp buffer.
+ */
+ @Override public FullPageId pullPageFromCpBuffer() {
+ long idx = GridUnsafe.getLong(checkpointPool.lastAllocatedIdxPtr);
+
+ long lastIdx = ThreadLocalRandom.current().nextLong(idx / 2, idx);
+
+ while (--lastIdx > 1) {
+ assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+ long relative = checkpointPool.relative(lastIdx);
+
+ long freePageAbsPtr = checkpointPool.absolute(relative);
+
+ FullPageId pageToReplace = PageHeader.fullPageId(freePageAbsPtr);
+
+ if (pageToReplace.pageId() == NULL_PAGE.pageId() ||
pageToReplace.groupId() == NULL_PAGE.groupId())
+ continue;
+
+ if (!isInCheckpoint(pageToReplace))
+ continue;
+
+ return pageToReplace;
+ }
+
+ return NULL_PAGE;
+ }
+
/**
* Gets a collection of all pages currently marked as dirty. Will create a
collection copy.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
index d87044d..5e4c80b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
*/
public class PagePool {
/** Relative pointer chunk index mask. */
- private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
+ static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
/** Address mask to avoid ABA problem. */
private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL;
@@ -227,7 +227,7 @@ public class PagePool {
* @param pageIdx Page index in the pool.
* @return Relative pointer.
*/
- private long relative(long pageIdx) {
+ long relative(long pageIdx) {
return pageIdx | ((long)idx) << 40;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index cc0f496..4909c92 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -81,8 +81,7 @@ public class PagesWriteThrottle implements
PagesWriteThrottlePolicy {
this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
this.log = log;
- if (!throttleOnlyPagesInCheckpoint)
- assert cpProgress != null : "cpProgress must be not null if ratio
based throttling mode is used";
+ assert throttleOnlyPagesInCheckpoint || cpProgress != null :
"cpProgress must be not null if ratio based throttling mode is used";
}
/** {@inheritDoc} */
@@ -182,10 +181,8 @@ public class PagesWriteThrottle implements
PagesWriteThrottlePolicy {
notInCheckpointBackoffCntr.set(0);
}
- /**
- * @return {@code True} if throttling should be enabled, and {@code False}
otherwise.
- */
- private boolean shouldThrottle() {
+ /** {@inheritDoc} */
+ @Override public boolean shouldThrottle() {
int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize()
* CP_BUF_FILL_THRESHOLD);
return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
index 5466ae8..9b68acd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
@@ -52,4 +52,11 @@ public interface PagesWriteThrottlePolicy {
* Callback to notify throttling policy checkpoint was finished.
*/
void onFinishCheckpoint();
+
+ /**
+ * @return {@code True} if throttling should be enabled, and {@code False}
otherwise.
+ */
+ default boolean shouldThrottle() {
+ return false;
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 0e86f2b..86699a8 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -267,28 +267,23 @@ public class IgniteThrottlingUnitTest {
}
when(pageMemory2g.checkpointBufferPagesSize()).thenReturn(100);
- when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(70);
+
+ AtomicInteger checkpointBufferPagesCount = new AtomicInteger(70);
+
+ when(pageMemory2g.checkpointBufferPagesCount()).thenAnswer(mock ->
checkpointBufferPagesCount.get());
try {
loadThreads.forEach(Thread::start);
- for (int i = 0; i < 100_000; i++)
+ for (int i = 0; i < 1_000; i++)
loadThreads.forEach(LockSupport::unpark);
// Awaiting that all load threads are parked.
for (Thread t : loadThreads)
assertTrue(t.getName(), waitForCondition(() -> t.getState() ==
TIMED_WAITING, 500L));
- plc.tryWakeupThrottledThreads();
-
- // Threads shouldn't wakeup because of throttling enabled.
- for (Thread t : loadThreads)
- assertEquals(t.getName(), TIMED_WAITING, t.getState());
-
// Disable throttling
- when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(50);
-
- plc.tryWakeupThrottledThreads();
+ checkpointBufferPagesCount.set(50);
// Awaiting that all load threads are unparked.
for (Thread t : loadThreads)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 2fc8cbd..f593b26 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.pagemem;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -57,6 +58,7 @@ import
org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
@@ -65,6 +67,7 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.mockito.Mockito;
@@ -91,7 +94,7 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
*/
@Test
public void testThatAllocationTooMuchPagesCauseToOOMException() throws
Exception {
- PageMemoryImpl memory =
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
+ PageMemoryImpl memory =
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED, null);
try {
while (!Thread.currentThread().isInterrupted())
@@ -109,7 +112,7 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
*/
@Test
public void testCheckpointBufferOverusageDontCauseWriteLockLeak() throws
Exception {
- PageMemoryImpl memory =
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
+ PageMemoryImpl memory =
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED, null);
List<FullPageId> pages = new ArrayList<>();
@@ -185,6 +188,33 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
}
/**
+ * Tests that with throttling enabled emptify cp buffer primarily with
enabled CHECKPOINT_BUFFER_ONLY throttling.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testThrottlingEmptifyCpBufFirst() throws Exception {
+
runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY);
+ }
+
+ /**
+ * Tests that with throttling enabled emptify cp buffer primarily with
enabled SPEED_BASED throttling.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testThrottlingEmptifyCpBufFirstSpeedBased() throws Exception {
+
runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy.SPEED_BASED);
+ }
+
+ /**
+ * Tests that with throttling enabled emptify cp buffer primarily with
enabled TARGET_RATIO_BASED throttling.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testThrottlingEmptifyCpBufFirstRatioBased() throws Exception {
+
runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED);
+ }
+
+ /**
* @throws Exception if failed.
*/
@Test
@@ -196,7 +226,8 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
1,
PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
pageStoreMgr,
- pageStoreMgr
+ pageStoreMgr,
+ null
);
int initPageCnt = 10;
@@ -236,6 +267,49 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
}
/**
+ * @throws Exception if failed.
+ */
+ public void runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy
plc) throws Exception {
+ TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+ final List<FullPageId> allocated = new ArrayList<>();
+
+ int pagesForStartThrottling = 10;
+
+ // Create a 1 mb page memory.
+ PageMemoryImpl memory = createPageMemory(
+ 1,
+ plc,
+ pageStoreMgr,
+ pageStoreMgr,
+ new IgniteInClosure<FullPageId>() {
+ @Override public void apply(FullPageId fullPageId) {
+ assertTrue(allocated.contains(fullPageId));
+ }
+ }
+ );
+
+ assert pagesForStartThrottling < memory.checkpointBufferPagesSize() /
3;
+
+ for (int i = 0; i < pagesForStartThrottling +
(memory.checkpointBufferPagesSize() * 2 / 3); i++) {
+ long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+ FullPageId fullId = new FullPageId(id, 1);
+
+ allocated.add(fullId);
+
+ writePage(memory, fullId, (byte)1);
+ }
+
+ GridMultiCollectionWrapper<FullPageId> markedPages =
memory.beginCheckpoint(new GridFinishedFuture());
+
+ for (int i = 0; i < 10 + (memory.checkpointBufferPagesSize() * 2 / 3);
i++)
+ writePage(memory, allocated.get(i), (byte)1);
+
+ doCheckpoint(markedPages, memory, pageStoreMgr);
+ }
+
+ /**
* @param cpPages Checkpoint pages acuiqred by {@code beginCheckpoint()}.
* @param memory Page memory.
* @param pageStoreMgr Test page store manager.
@@ -258,6 +332,21 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
ByteBuffer buf = ByteBuffer.wrap(data);
memory.checkpointWritePage(cpPage, buf, pageStoreWriter, null);
+
+ while (memory.shouldThrottle()) {
+ FullPageId cpPageId = memory.pullPageFromCpBuffer();
+
+ if (cpPageId.equals(FullPageId.NULL_PAGE))
+ break;
+
+ ByteBuffer tmpWriteBuf =
ByteBuffer.allocateDirect(memory.pageSize());
+
+ tmpWriteBuf.order(ByteOrder.nativeOrder());
+
+ tmpWriteBuf.rewind();
+
+ memory.checkpointWritePage(cpPageId, tmpWriteBuf,
pageStoreWriter, null);
+ }
}
memory.finishCheckpoint();
@@ -275,7 +364,8 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
1,
PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
pageStoreMgr,
- pageStoreMgr);
+ pageStoreMgr,
+ null);
int initPageCnt = 500;
@@ -352,7 +442,7 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy
plc) throws Exception {
- PageMemoryImpl memory = createPageMemory(plc);
+ PageMemoryImpl memory = createPageMemory(plc, null);
List<FullPageId> pages = new ArrayList<>();
@@ -456,14 +546,16 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
* @throws Exception If creating mock failed.
*/
private PageMemoryImpl createPageMemory(
- PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+ PageMemoryImpl.ThrottlingPolicy throttlingPlc,
+ @Nullable IgniteInClosure<FullPageId> cpBufChecker) throws Exception {
return createPageMemory(
MAX_SIZE,
throttlingPlc,
new NoOpPageStoreManager(),
(fullPageId, byteBuf, tag) -> {
assert false : "No page replacement (rotation with disk)
should happen during the test";
- });
+ },
+ cpBufChecker);
}
/**
@@ -474,14 +566,15 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
int maxSize,
PageMemoryImpl.ThrottlingPolicy throttlingPlc,
IgnitePageStoreManager mgr,
- PageStoreWriter replaceWriter
+ PageStoreWriter replaceWriter,
+ @Nullable IgniteInClosure<FullPageId> cpBufChecker
) throws Exception {
long[] sizes = new long[5];
for (int i = 0; i < sizes.length; i++)
sizes[i] = maxSize * MB / 4;
- sizes[4] = 5 * MB;
+ sizes[4] = maxSize * MB / 4;
DirectMemoryProvider provider = new UnsafeMemoryProvider(log);
@@ -536,7 +629,7 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
Mockito.when(noThrottle.syncedPagesCounter()).thenReturn(new
AtomicInteger(1_000_000));
Mockito.when(noThrottle.writtenPagesCounter()).thenReturn(new
AtomicInteger(1_000_000));
- PageMemoryImpl mem = new PageMemoryImpl(
+ PageMemoryImpl mem = cpBufChecker == null ? new PageMemoryImpl(
provider,
sizes,
sharedCtx,
@@ -555,7 +648,34 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
NO_OP_METRICS),
throttlingPlc,
noThrottle
- );
+ ): new PageMemoryImpl(
+ provider,
+ sizes,
+ sharedCtx,
+ PAGE_SIZE,
+ replaceWriter,
+ new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
+ @Override public void applyx(Long page, FullPageId fullId,
PageMemoryEx pageMem) {
+ }
+ }, new CheckpointLockStateChecker() {
+ @Override public boolean checkpointLockIsHeldByThread() {
+ return true;
+ }
+ },
+ new
DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration(),
+ kernalCtx.metric(),
+ NO_OP_METRICS),
+ throttlingPlc,
+ noThrottle
+ ) {
+ @Override public FullPageId pullPageFromCpBuffer() {
+ FullPageId pageId = super.pullPageFromCpBuffer();
+
+ cpBufChecker.apply(pageId);
+
+ return pageId;
+ }
+ };
mem.metrics().enableMetrics();