This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 04999c5  IGNITE-12452 Write pages from checkpoint buffer with higher 
priority - Fixes #7145.
04999c5 is described below

commit 04999c518d4f22895247a7686879f2cdfdbcba4f
Author: Evgeny Stanilovskiy <[email protected]>
AuthorDate: Tue Dec 24 18:22:42 2019 +0300

    IGNITE-12452 Write pages from checkpoint buffer with higher priority - 
Fixes #7145.
---
 .../apache/ignite/internal/pagemem/FullPageId.java |   3 +
 .../GridCacheDatabaseSharedManager.java            |  48 +++----
 .../cache/persistence/pagemem/PageMemoryEx.java    |  10 ++
 .../cache/persistence/pagemem/PageMemoryImpl.java  |  40 ++++++
 .../cache/persistence/pagemem/PagePool.java        |   4 +-
 .../persistence/pagemem/PagesWriteThrottle.java    |   9 +-
 .../pagemem/PagesWriteThrottlePolicy.java          |   7 +
 .../pagemem/IgniteThrottlingUnitTest.java          |  17 +--
 .../persistence/pagemem/PageMemoryImplTest.java    | 142 +++++++++++++++++++--
 9 files changed, 227 insertions(+), 53 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
index 4c7d031..588f7c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/FullPageId.java
@@ -49,6 +49,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  * Effective page ID is page ID with zeroed bits used for page ID rotation.
  */
 public class FullPageId {
+    /** */
+    public static final FullPageId NULL_PAGE = new FullPageId(-1, -1);
+
     /** Page ID. */
     private final long pageId;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a89919b..33a307c 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -4788,30 +4788,15 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 List<FullPageId> pagesToRetry = writePages(writePageIds);
 
                 if (pagesToRetry.isEmpty())
-                    doneFut.onDone((Void)null);
+                    doneFut.onDone();
                 else {
                     LT.warn(log, pagesToRetry.size() + " checkpoint pages were 
not written yet due to unsuccessful " +
                         "page write lock acquisition and will be retried");
 
-                    if (retryWriteExecutor == null) {
-                        while (!pagesToRetry.isEmpty())
-                            pagesToRetry = writePages(pagesToRetry);
+                    while (!pagesToRetry.isEmpty())
+                        pagesToRetry = writePages(pagesToRetry);
 
-                        doneFut.onDone((Void)null);
-                    }
-                    else {
-                        // Submit current retry pages to the end of the queue 
to avoid starvation.
-                        WriteCheckpointPages retryWritesTask = new 
WriteCheckpointPages(
-                            tracker,
-                            pagesToRetry,
-                            updStores,
-                            doneFut,
-                            totalPagesToWrite,
-                            beforePageWrite,
-                            retryWriteExecutor);
-
-                        retryWriteExecutor.submit(retryWritesTask);
-                    }
+                    doneFut.onDone();
                 }
             }
             catch (Throwable e) {
@@ -4832,16 +4817,14 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
             ByteBuffer tmpWriteBuf = threadBuf.get();
 
+            boolean throttlingEnabled = resolveThrottlingPolicy() != 
PageMemoryImpl.ThrottlingPolicy.DISABLED;
+
             for (FullPageId fullId : writePageIds) {
                 if (checkpointer.shutdownNow)
                     break;
 
-                tmpWriteBuf.rewind();
-
                 beforePageWrite.run();
 
-                snapshotMgr.beforePageWrite(fullId);
-
                 int grpId = fullId.groupId();
 
                 PageMemoryEx pageMem;
@@ -4862,7 +4845,26 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     pageMem = (PageMemoryEx)region.pageMemory();
                 }
 
+                snapshotMgr.beforePageWrite(fullId);
+
+                tmpWriteBuf.rewind();
+
                 pageMem.checkpointWritePage(fullId, tmpWriteBuf, 
pageStoreWriter, tracker);
+
+                if (throttlingEnabled) {
+                    while (pageMem.shouldThrottle()) {
+                        FullPageId cpPageId = pageMem.pullPageFromCpBuffer();
+
+                        if (cpPageId.equals(FullPageId.NULL_PAGE))
+                            break;
+
+                        snapshotMgr.beforePageWrite(cpPageId);
+
+                        tmpWriteBuf.rewind();
+
+                        pageMem.checkpointWritePage(cpPageId, tmpWriteBuf, 
pageStoreWriter, tracker);
+                    }
+                }
             }
 
             return pagesToRetry;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index f9fdb0d..8465783 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -165,4 +165,14 @@ public interface PageMemoryEx extends PageMemory {
      * @return Future that will be completed when all pages are cleared.
      */
     public IgniteInternalFuture<Void> clearAsync(LoadedPagesMap.KeyPredicate 
pred, boolean cleanDirty);
+
+    /**
+     * Pull page from checkpoint buffer.
+     */
+    public FullPageId pullPageFromCpBuffer();
+
+    /**
+     * Calculates throttling condition.
+     */
+    public boolean shouldThrottle();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index b0cc7bd..2f90d41 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -95,6 +95,8 @@ import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.TRUE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.internal.pagemem.FullPageId.NULL_PAGE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PagePool.SEGMENT_INDEX_MASK;
 import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
 
 /**
@@ -1265,6 +1267,8 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                 copyInBuffer(tmpAbsPtr, buf);
 
+                PageHeader.fullPageId(tmpAbsPtr, NULL_PAGE);
+
                 GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), 
(byte)0);
 
                 if (tracker != null)
@@ -1616,6 +1620,8 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             PageHeader.dirty(absPtr, false);
             PageHeader.tempBufferPointer(absPtr, tmpRelPtr);
+            // info for checkpoint buffer cleaner.
+            PageHeader.fullPageId(tmpAbsPtr, fullId);
 
             assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
             assert PageIO.getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; //TODO 
GG-11480
@@ -1847,6 +1853,40 @@ public class PageMemoryImpl implements PageMemoryEx {
         return memMetrics;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean shouldThrottle() {
+        return writeThrottle.shouldThrottle();
+    }
+
+    /**
+     * Get arbitrary page from cp buffer.
+     */
+    @Override public FullPageId pullPageFromCpBuffer() {
+        long idx = GridUnsafe.getLong(checkpointPool.lastAllocatedIdxPtr);
+
+        long lastIdx = ThreadLocalRandom.current().nextLong(idx / 2, idx);
+
+        while (--lastIdx > 1) {
+            assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+            long relative = checkpointPool.relative(lastIdx);
+
+            long freePageAbsPtr = checkpointPool.absolute(relative);
+
+            FullPageId pageToReplace = PageHeader.fullPageId(freePageAbsPtr);
+
+            if (pageToReplace.pageId() == NULL_PAGE.pageId() || 
pageToReplace.groupId() == NULL_PAGE.groupId())
+                continue;
+
+            if (!isInCheckpoint(pageToReplace))
+                continue;
+
+            return pageToReplace;
+        }
+
+        return NULL_PAGE;
+    }
+
     /**
      * Gets a collection of all pages currently marked as dirty. Will create a 
collection copy.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
index d87044d..5e4c80b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  */
 public class PagePool {
     /** Relative pointer chunk index mask. */
-    private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
+    static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
 
     /** Address mask to avoid ABA problem. */
     private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL;
@@ -227,7 +227,7 @@ public class PagePool {
      * @param pageIdx Page index in the pool.
      * @return Relative pointer.
      */
-    private long relative(long pageIdx) {
+    long relative(long pageIdx) {
         return pageIdx | ((long)idx) << 40;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index cc0f496..4909c92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -81,8 +81,7 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
         this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
         this.log = log;
 
-        if (!throttleOnlyPagesInCheckpoint)
-            assert cpProgress != null : "cpProgress must be not null if ratio 
based throttling mode is used";
+        assert throttleOnlyPagesInCheckpoint || cpProgress != null : 
"cpProgress must be not null if ratio based throttling mode is used";
     }
 
     /** {@inheritDoc} */
@@ -182,10 +181,8 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
         notInCheckpointBackoffCntr.set(0);
     }
 
-    /**
-     * @return {@code True} if throttling should be enabled, and {@code False} 
otherwise.
-     */
-    private boolean shouldThrottle() {
+    /** {@inheritDoc} */
+    @Override public boolean shouldThrottle() {
         int checkpointBufLimit = (int)(pageMemory.checkpointBufferPagesSize() 
* CP_BUF_FILL_THRESHOLD);
 
         return pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
index 5466ae8..9b68acd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
@@ -52,4 +52,11 @@ public interface PagesWriteThrottlePolicy {
      * Callback to notify throttling policy checkpoint was finished.
      */
     void onFinishCheckpoint();
+
+    /**
+     * @return {@code True} if throttling should be enabled, and {@code False} 
otherwise.
+     */
+    default boolean shouldThrottle() {
+        return false;
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 0e86f2b..86699a8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -267,28 +267,23 @@ public class IgniteThrottlingUnitTest {
         }
 
         when(pageMemory2g.checkpointBufferPagesSize()).thenReturn(100);
-        when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(70);
+
+        AtomicInteger checkpointBufferPagesCount = new AtomicInteger(70);
+
+        when(pageMemory2g.checkpointBufferPagesCount()).thenAnswer(mock -> 
checkpointBufferPagesCount.get());
 
         try {
             loadThreads.forEach(Thread::start);
 
-            for (int i = 0; i < 100_000; i++)
+            for (int i = 0; i < 1_000; i++)
                 loadThreads.forEach(LockSupport::unpark);
 
             // Awaiting that all load threads are parked.
             for (Thread t : loadThreads)
                 assertTrue(t.getName(), waitForCondition(() -> t.getState() == 
TIMED_WAITING, 500L));
 
-            plc.tryWakeupThrottledThreads();
-
-            // Threads shouldn't wakeup because of throttling enabled.
-            for (Thread t : loadThreads)
-                assertEquals(t.getName(), TIMED_WAITING, t.getState());
-
             // Disable throttling
-            when(pageMemory2g.checkpointBufferPagesCount()).thenReturn(50);
-
-            plc.tryWakeupThrottledThreads();
+            checkpointBufferPagesCount.set(50);
 
             // Awaiting that all load threads are unparked.
             for (Thread t : loadThreads)
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 2fc8cbd..f593b26 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,6 +58,7 @@ import 
org.apache.ignite.internal.util.GridMultiCollectionWrapper;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
 import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
@@ -65,6 +67,7 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -91,7 +94,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testThatAllocationTooMuchPagesCauseToOOMException() throws 
Exception {
-        PageMemoryImpl memory = 
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
+        PageMemoryImpl memory = 
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED, null);
 
         try {
             while (!Thread.currentThread().isInterrupted())
@@ -109,7 +112,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testCheckpointBufferOverusageDontCauseWriteLockLeak() throws 
Exception {
-        PageMemoryImpl memory = 
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
+        PageMemoryImpl memory = 
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED, null);
 
         List<FullPageId> pages = new ArrayList<>();
 
@@ -185,6 +188,33 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Tests that with throttling enabled emptify cp buffer primarily with 
enabled CHECKPOINT_BUFFER_ONLY throttling.
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testThrottlingEmptifyCpBufFirst() throws Exception {
+        
runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY);
+    }
+
+    /**
+     * Tests that with throttling enabled emptify cp buffer primarily with 
enabled SPEED_BASED throttling.
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testThrottlingEmptifyCpBufFirstSpeedBased() throws Exception {
+        
runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy.SPEED_BASED);
+    }
+
+    /**
+     * Tests that with throttling enabled emptify cp buffer primarily with 
enabled TARGET_RATIO_BASED throttling.
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testThrottlingEmptifyCpBufFirstRatioBased() throws Exception {
+        
runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED);
+    }
+
+    /**
      * @throws Exception if failed.
      */
     @Test
@@ -196,7 +226,8 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             1,
             PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
             pageStoreMgr,
-            pageStoreMgr
+            pageStoreMgr,
+            null
         );
 
         int initPageCnt = 10;
@@ -236,6 +267,49 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    public void runThrottlingEmptifyCpBufFirst(PageMemoryImpl.ThrottlingPolicy 
plc) throws Exception {
+        TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+        final List<FullPageId> allocated = new ArrayList<>();
+
+        int pagesForStartThrottling = 10;
+
+        // Create a 1 mb page memory.
+        PageMemoryImpl memory = createPageMemory(
+            1,
+            plc,
+            pageStoreMgr,
+            pageStoreMgr,
+            new IgniteInClosure<FullPageId>() {
+                @Override public void apply(FullPageId fullPageId) {
+                    assertTrue(allocated.contains(fullPageId));
+                }
+            }
+        );
+
+        assert pagesForStartThrottling < memory.checkpointBufferPagesSize() / 
3;
+
+        for (int i = 0; i < pagesForStartThrottling + 
(memory.checkpointBufferPagesSize() * 2 / 3); i++) {
+            long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+            FullPageId fullId = new FullPageId(id, 1);
+
+            allocated.add(fullId);
+
+            writePage(memory, fullId, (byte)1);
+        }
+
+        GridMultiCollectionWrapper<FullPageId> markedPages = 
memory.beginCheckpoint(new GridFinishedFuture());
+
+        for (int i = 0; i < 10 + (memory.checkpointBufferPagesSize() * 2 / 3); 
i++)
+            writePage(memory, allocated.get(i), (byte)1);
+
+        doCheckpoint(markedPages, memory, pageStoreMgr);
+    }
+
+    /**
      * @param cpPages Checkpoint pages acuiqred by {@code beginCheckpoint()}.
      * @param memory Page memory.
      * @param pageStoreMgr Test page store manager.
@@ -258,6 +332,21 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             ByteBuffer buf = ByteBuffer.wrap(data);
 
             memory.checkpointWritePage(cpPage, buf, pageStoreWriter, null);
+
+            while (memory.shouldThrottle()) {
+                FullPageId cpPageId = memory.pullPageFromCpBuffer();
+
+                if (cpPageId.equals(FullPageId.NULL_PAGE))
+                    break;
+
+                ByteBuffer tmpWriteBuf = 
ByteBuffer.allocateDirect(memory.pageSize());
+
+                tmpWriteBuf.order(ByteOrder.nativeOrder());
+
+                tmpWriteBuf.rewind();
+
+                memory.checkpointWritePage(cpPageId, tmpWriteBuf, 
pageStoreWriter, null);
+            }
         }
 
         memory.finishCheckpoint();
@@ -275,7 +364,8 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             1,
             PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
             pageStoreMgr,
-            pageStoreMgr);
+            pageStoreMgr,
+            null);
 
         int initPageCnt = 500;
 
@@ -352,7 +442,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void 
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy
 plc) throws Exception {
-        PageMemoryImpl memory = createPageMemory(plc);
+        PageMemoryImpl memory = createPageMemory(plc, null);
 
         List<FullPageId> pages = new ArrayList<>();
 
@@ -456,14 +546,16 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
      * @throws Exception If creating mock failed.
      */
     private PageMemoryImpl createPageMemory(
-        PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+        PageMemoryImpl.ThrottlingPolicy throttlingPlc,
+        @Nullable IgniteInClosure<FullPageId> cpBufChecker) throws Exception {
         return createPageMemory(
             MAX_SIZE,
             throttlingPlc,
             new NoOpPageStoreManager(),
             (fullPageId, byteBuf, tag) -> {
                 assert false : "No page replacement (rotation with disk) 
should happen during the test";
-            });
+            },
+            cpBufChecker);
     }
 
     /**
@@ -474,14 +566,15 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
         int maxSize,
         PageMemoryImpl.ThrottlingPolicy throttlingPlc,
         IgnitePageStoreManager mgr,
-        PageStoreWriter replaceWriter
+        PageStoreWriter replaceWriter,
+        @Nullable IgniteInClosure<FullPageId> cpBufChecker
     ) throws Exception {
         long[] sizes = new long[5];
 
         for (int i = 0; i < sizes.length; i++)
             sizes[i] = maxSize * MB / 4;
 
-        sizes[4] = 5 * MB;
+        sizes[4] = maxSize * MB / 4;
 
         DirectMemoryProvider provider = new UnsafeMemoryProvider(log);
 
@@ -536,7 +629,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
         Mockito.when(noThrottle.syncedPagesCounter()).thenReturn(new 
AtomicInteger(1_000_000));
         Mockito.when(noThrottle.writtenPagesCounter()).thenReturn(new 
AtomicInteger(1_000_000));
 
-        PageMemoryImpl mem = new PageMemoryImpl(
+        PageMemoryImpl mem = cpBufChecker == null ? new PageMemoryImpl(
             provider,
             sizes,
             sharedCtx,
@@ -555,7 +648,34 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
                 NO_OP_METRICS),
             throttlingPlc,
             noThrottle
-        );
+        ): new PageMemoryImpl(
+            provider,
+            sizes,
+            sharedCtx,
+            PAGE_SIZE,
+            replaceWriter,
+            new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
+                @Override public void applyx(Long page, FullPageId fullId, 
PageMemoryEx pageMem) {
+                }
+            }, new CheckpointLockStateChecker() {
+            @Override public boolean checkpointLockIsHeldByThread() {
+                return true;
+            }
+        },
+            new 
DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration(),
+                kernalCtx.metric(),
+                NO_OP_METRICS),
+            throttlingPlc,
+            noThrottle
+        ) {
+            @Override public FullPageId pullPageFromCpBuffer() {
+                FullPageId pageId = super.pullPageFromCpBuffer();
+
+                cpBufChecker.apply(pageId);
+
+                return pageId;
+            }
+        };
 
         mem.metrics().enableMetrics();
 

Reply via email to