Repository: ignite
Updated Branches:
  refs/heads/master 8da7c9e1b -> 05d58bb6e


IGNITE-7751 Use throttling to protect from checkpoint buffer overflow - Fixes 
#3611.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05d58bb6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05d58bb6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05d58bb6

Branch: refs/heads/master
Commit: 05d58bb6efe35bfdd8bdb3e74dfd2627f6df7f9c
Parents: 8da7c9e
Author: Ivan Rakov <ivan.glu...@gmail.com>
Authored: Tue Mar 13 19:49:12 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Tue Mar 13 19:49:12 2018 +0300

----------------------------------------------------------------------
 .../internal/pagemem/PageIdAllocator.java       |   4 +-
 .../pagemem/impl/PageMemoryNoStoreImpl.java     |   2 +-
 .../GridCacheDatabaseSharedManager.java         |  39 ++--
 .../persistence/pagemem/PageMemoryImpl.java     | 169 +++++++--------
 .../pagemem/PagesWriteSpeedBasedThrottle.java   |  16 +-
 .../persistence/pagemem/PagesWriteThrottle.java |  42 ++--
 .../pagemem/BPlusTreePageMemoryImplTest.java    |   2 +-
 .../BPlusTreeReuseListPageMemoryImplTest.java   |   3 +-
 ...gnitePageMemReplaceDelayedWriteUnitTest.java |   5 +-
 .../pagemem/IgniteThrottlingUnitTest.java       |   1 -
 .../pagemem/IndexStoragePageMemoryImplTest.java |   3 +-
 .../pagemem/PageMemoryImplNoLoadTest.java       |   3 +-
 .../persistence/pagemem/PageMemoryImplTest.java | 211 ++++++++++++++++++-
 13 files changed, 363 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
index ae7da14..c6aeabe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
@@ -38,11 +38,11 @@ public interface PageIdAllocator {
     /**
      * Allocates a page from the space for the given partition ID and the 
given flags.
      *
-     * @param cacheId Cache Group ID.
+     * @param grpId Cache Group ID.
      * @param partId Partition ID.
      * @return Allocated page ID.
      */
-    public long allocatePage(int cacheId, int partId, byte flags) throws 
IgniteCheckedException;
+    public long allocatePage(int grpId, int partId, byte flags) throws 
IgniteCheckedException;
 
     /**
      * The given page is free now.

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
index af1555e..7424af6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
@@ -254,7 +254,7 @@ public class PageMemoryNoStoreImpl implements PageMemory {
     }
 
     /** {@inheritDoc} */
-    @Override public long allocatePage(int cacheId, int partId, byte flags) {
+    @Override public long allocatePage(int grpId, int partId, byte flags) {
         memMetrics.incrementTotalAllocatedPages();
 
         long relPtr = borrowFreePage();

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d6a8a30..997f89a 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -193,6 +193,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     private final int walRebalanceThreshold = 
IgniteSystemProperties.getInteger(
         IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
 
+    /** Value of property for throttling policy override. */
+    private final String throttlingPolicyOverride = 
IgniteSystemProperties.getString(
+        IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
+
     /** Checkpoint lock hold count. */
     private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = new 
ThreadLocal<Integer>() {
         @Override protected Integer initialValue() {
@@ -932,19 +936,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             chpBufSize = cacheSize;
         }
 
-        PageMemoryImpl.ThrottlingPolicy plc = 
persistenceCfg.isWriteThrottlingEnabled()
-            ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
-            : PageMemoryImpl.ThrottlingPolicy.NONE;
-
-        String val = 
IgniteSystemProperties.getString(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
-
-        if (val != null) {
-            if ("ratio".equalsIgnoreCase(val))
-                plc = PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED;
-            else if ("speed".equalsIgnoreCase(val) || Boolean.valueOf(val))
-                plc = PageMemoryImpl.ThrottlingPolicy.SPEED_BASED;
-        }
-
         GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
 
         if (trackable)
@@ -985,7 +976,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             changeTracker,
             this,
             memMetrics,
-            plc,
+            resolveThrottlingPolicy(),
             this
         );
 
@@ -994,6 +985,26 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         return pageMem;
     }
 
+    /**
+     * Resolves throttling policy according to the settings.
+     */
+    @NotNull private PageMemoryImpl.ThrottlingPolicy resolveThrottlingPolicy() 
{
+        PageMemoryImpl.ThrottlingPolicy plc = 
persistenceCfg.isWriteThrottlingEnabled()
+            ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
+            : PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY;
+
+        if (throttlingPolicyOverride != null) {
+            try {
+                plc = 
PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPolicyOverride.toUpperCase());
+            }
+            catch (IllegalArgumentException e) {
+                log.error("Incorrect value of 
IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED property: " +
+                    throttlingPolicyOverride + ". Default throttling policy " 
+ plc + " will be used.");
+            }
+        }
+        return plc;
+    }
+
     /** {@inheritDoc} */
     @Override protected void 
checkRegionEvictionProperties(DataRegionConfiguration regCfg, 
DataStorageConfiguration dbCfg)
         throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index fa10a1f..9f979f5 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.lang.Boolean.FALSE;
@@ -147,7 +148,7 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Page ID offset */
     public static final int PAGE_ID_OFFSET = 16;
 
-    /** Page cache ID offset. */
+    /** Page cache group ID offset. */
     public static final int PAGE_CACHE_ID_OFFSET = 24;
 
     /** Page pin counter offset. */
@@ -163,7 +164,7 @@ public class PageMemoryImpl implements PageMemoryEx {
      * 8b Marker/timestamp
      * 8b Relative pointer
      * 8b Page ID
-     * 4b Cache ID
+     * 4b Cache group ID
      * 4b Pin count
      * 8b Lock
      * 8b Temporary buffer
@@ -176,6 +177,10 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Tracking io. */
     private static final TrackingPageIO trackingIO = 
TrackingPageIO.VERSIONS.latest();
 
+    /** Checkpoint pool overflow error message. */
+    public static final String CHECKPOINT_POOL_OVERFLOW_ERROR_MSG = "Failed to 
allocate temporary buffer for checkpoint " +
+        "(increase checkpointPageBufferSize configuration property)";
+
     /** Page size. */
     private final int sysPageSize;
 
@@ -277,7 +282,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         CheckpointLockStateChecker stateChecker,
         DataRegionMetricsImpl memMetrics,
         @Nullable ThrottlingPolicy throttlingPlc,
-        @Nullable CheckpointWriteProgressSupplier cpProgressProvider
+        @NotNull CheckpointWriteProgressSupplier cpProgressProvider
     ) {
         assert ctx != null;
         assert pageSize > 0;
@@ -294,7 +299,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 null;
         this.changeTracker = changeTracker;
         this.stateChecker = stateChecker;
-        this.throttlingPlc = throttlingPlc != null ? throttlingPlc : 
ThrottlingPolicy.NONE;
+        this.throttlingPlc = throttlingPlc != null ? throttlingPlc : 
ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY;
         this.cpProgressProvider = cpProgressProvider;
 
         storeMgr = ctx.pageStore();
@@ -363,22 +368,15 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /**
-     *
+     * Resolves instance of {@link PagesWriteThrottlePolicy} according to 
chosen throttle policy.
      */
     private void initWriteThrottle() {
-        if (!isThrottlingEnabled())
-            return;
-
-        if (cpProgressProvider == null) {
-            log.error("Write throttle can't start. CP progress provider not 
presented");
-
-            throttlingPlc = ThrottlingPolicy.NONE;
-        }
-
         if (throttlingPlc == ThrottlingPolicy.SPEED_BASED)
             writeThrottle = new PagesWriteSpeedBasedThrottle(this, 
cpProgressProvider, stateChecker, log);
-        else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
-            writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, 
stateChecker);
+        else if (throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
+            writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, 
stateChecker, false);
+        else if (throttlingPlc == ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY)
+            writeThrottle = new PagesWriteThrottle(this, null, stateChecker, 
true);
     }
 
     /** {@inheritDoc} */
@@ -398,8 +396,8 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public void releasePage(int cacheId, long pageId, long page) {
-        Segment seg = segment(cacheId, pageId);
+    @Override public void releasePage(int grpId, long pageId, long page) {
+        Segment seg = segment(grpId, pageId);
 
         seg.readLock().lock();
 
@@ -412,18 +410,18 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public long readLock(int cacheId, long pageId, long page) {
-        return readLockPage(page, new FullPageId(pageId, cacheId), false);
+    @Override public long readLock(int grpId, long pageId, long page) {
+        return readLockPage(page, new FullPageId(pageId, grpId), false);
     }
 
     /** {@inheritDoc} */
-    @Override public void readUnlock(int cacheId, long pageId, long page) {
+    @Override public void readUnlock(int grpId, long pageId, long page) {
         readUnlockPage(page);
     }
 
     /** {@inheritDoc} */
-    @Override public long writeLock(int cacheId, long pageId, long page) {
-        return writeLock(cacheId, pageId, page, false);
+    @Override public long writeLock(int grpId, long pageId, long page) {
+        return writeLock(grpId, pageId, page, false);
     }
 
     /** {@inheritDoc} */
@@ -432,14 +430,14 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public long tryWriteLock(int cacheId, long pageId, long page) {
-        return tryWriteLockPage(page, new FullPageId(pageId, cacheId), true);
+    @Override public long tryWriteLock(int grpId, long pageId, long page) {
+        return tryWriteLockPage(page, new FullPageId(pageId, grpId), true);
     }
 
     /** {@inheritDoc} */
-    @Override public void writeUnlock(int cacheId, long pageId, long page, 
Boolean walPlc,
+    @Override public void writeUnlock(int grpId, long pageId, long page, 
Boolean walPlc,
         boolean dirtyFlag) {
-        writeUnlock(cacheId, pageId, page, walPlc, dirtyFlag, false);
+        writeUnlock(grpId, pageId, page, walPlc, dirtyFlag, false);
     }
 
     /** {@inheritDoc} */
@@ -449,12 +447,12 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isDirty(int cacheId, long pageId, long page) {
+    @Override public boolean isDirty(int grpId, long pageId, long page) {
         return isDirty(page);
     }
 
     /** {@inheritDoc} */
-    @Override public long allocatePage(int cacheId, int partId, byte flags) 
throws IgniteCheckedException {
+    @Override public long allocatePage(int grpId, int partId, byte flags) 
throws IgniteCheckedException {
         assert flags == PageIdAllocator.FLAG_DATA && partId <= 
PageIdAllocator.MAX_PARTITION_ID ||
             flags == PageIdAllocator.FLAG_IDX && partId == 
PageIdAllocator.INDEX_PARTITION :
             "flags = " + flags + ", partId = " + partId;
@@ -464,19 +462,19 @@ public class PageMemoryImpl implements PageMemoryEx {
         if (isThrottlingEnabled())
             writeThrottle.onMarkDirty(false);
 
-        long pageId = storeMgr.allocatePage(cacheId, partId, flags);
+        long pageId = storeMgr.allocatePage(grpId, partId, flags);
 
         assert PageIdUtils.pageIndex(pageId) > 0; //it's crucial for tracking 
pages (zero page is super one)
 
         // We need to allocate page in memory for marking it dirty to save it 
in the next checkpoint.
         // Otherwise it is possible that on file will be empty page which will 
be saved at snapshot and read with error
         // because there is no crc inside them.
-        Segment seg = segment(cacheId, pageId);
+        Segment seg = segment(grpId, pageId);
 
         DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != 
null
             ? delayedPageReplacementTracker.delayedPageWrite() : null;
 
-        FullPageId fullId = new FullPageId(pageId, cacheId);
+        FullPageId fullId = new FullPageId(pageId, grpId);
 
         seg.writeLock().lock();
 
@@ -484,15 +482,15 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         try {
             long relPtr = seg.loadedPages.get(
-                cacheId,
+                grpId,
                 PageIdUtils.effectivePageId(pageId),
-                seg.partGeneration(cacheId, partId),
+                seg.partGeneration(grpId, partId),
                 INVALID_REL_PTR,
                 OUTDATED_REL_PTR
             );
 
             if (relPtr == OUTDATED_REL_PTR)
-                relPtr = refreshOutdatedPage(seg, cacheId, pageId, false);
+                relPtr = refreshOutdatedPage(seg, grpId, pageId, false);
 
             if (relPtr == INVALID_REL_PTR)
                 relPtr = seg.borrowOrAllocateFreePage(pageId);
@@ -528,7 +526,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                         if (!ctx.wal().isAlwaysWriteFullPages())
                             ctx.wal().log(
                                 new InitNewPageRecord(
-                                    cacheId,
+                                    grpId,
                                     pageId,
                                     trackingIO.getType(),
                                     trackingIO.getVersion(), pageId
@@ -539,7 +537,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 }
             }
 
-            seg.loadedPages.put(cacheId, PageIdUtils.effectivePageId(pageId), 
relPtr, seg.partGeneration(cacheId, partId));
+            seg.loadedPages.put(grpId, PageIdUtils.effectivePageId(pageId), 
relPtr, seg.partGeneration(grpId, partId));
         }
         catch (IgniteOutOfMemoryException oom) {
             DataRegionConfiguration dataRegionCfg = 
getDataRegionConfiguration();
@@ -562,7 +560,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
 
         //we have allocated 'tracking' page, we need to allocate regular one
-        return isTrackingPage ? allocatePage(cacheId, partId, flags) : pageId;
+        return isTrackingPage ? allocatePage(grpId, partId, flags) : pageId;
     }
 
     /**
@@ -596,7 +594,7 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean freePage(int cacheId, long pageId) throws 
IgniteCheckedException {
+    @Override public boolean freePage(int grpId, long pageId) throws 
IgniteCheckedException {
         assert false : "Free page should be never called directly when 
persistence is enabled.";
 
         return false;
@@ -613,25 +611,25 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public long acquirePage(int cacheId, long pageId) throws 
IgniteCheckedException {
-        return acquirePage(cacheId, pageId, false);
+    @Override public long acquirePage(int grpId, long pageId) throws 
IgniteCheckedException {
+        return acquirePage(grpId, pageId, false);
     }
 
     /** {@inheritDoc} */
-    @Override public long acquirePage(int cacheId, long pageId, boolean 
restore) throws IgniteCheckedException {
-        FullPageId fullId = new FullPageId(pageId, cacheId);
+    @Override public long acquirePage(int grpId, long pageId, boolean restore) 
throws IgniteCheckedException {
+        FullPageId fullId = new FullPageId(pageId, grpId);
 
         int partId = PageIdUtils.partId(pageId);
 
-        Segment seg = segment(cacheId, pageId);
+        Segment seg = segment(grpId, pageId);
 
         seg.readLock().lock();
 
         try {
             long relPtr = seg.loadedPages.get(
-                cacheId,
+                grpId,
                 PageIdUtils.effectivePageId(pageId),
-                seg.partGeneration(cacheId, partId),
+                seg.partGeneration(grpId, partId),
                 INVALID_REL_PTR,
                 INVALID_REL_PTR
             );
@@ -660,9 +658,9 @@ public class PageMemoryImpl implements PageMemoryEx {
         try {
             // Double-check.
             long relPtr = seg.loadedPages.get(
-                cacheId,
+                grpId,
                 PageIdUtils.effectivePageId(pageId),
-                seg.partGeneration(cacheId, partId),
+                seg.partGeneration(grpId, partId),
                 INVALID_REL_PTR,
                 OUTDATED_REL_PTR
             );
@@ -688,10 +686,10 @@ public class PageMemoryImpl implements PageMemoryEx {
                 setDirty(fullId, absPtr, false, false);
 
                 seg.loadedPages.put(
-                    cacheId,
+                    grpId,
                     PageIdUtils.effectivePageId(pageId),
                     relPtr,
-                    seg.partGeneration(cacheId, partId)
+                    seg.partGeneration(grpId, partId)
                 );
 
                 long pageAddr = absPtr + PAGE_OVERHEAD;
@@ -722,7 +720,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             else if (relPtr == OUTDATED_REL_PTR) {
                 assert PageIdUtils.pageIndex(pageId) == 0 : fullId;
 
-                relPtr = refreshOutdatedPage(seg, cacheId, pageId, false);
+                relPtr = refreshOutdatedPage(seg, grpId, pageId, false);
 
                 absPtr = seg.absolute(relPtr);
 
@@ -764,7 +762,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 ByteBuffer buf = wrapPointer(pageAddr, pageSize());
 
                 try {
-                    storeMgr.read(cacheId, pageId, buf);
+                    storeMgr.read(grpId, pageId, buf);
                 }
                 catch (IgniteDataIntegrityViolationException ignore) {
                     U.warn(log, "Failed to read page (data integrity violation 
encountered, will try to " +
@@ -783,17 +781,17 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /**
      * @param seg Segment.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param pageId Page ID.
      * @param rmv {@code True} if page should be removed.
      * @return Relative pointer to refreshed page.
      */
-    private long refreshOutdatedPage(Segment seg, int cacheId, long pageId, 
boolean rmv) {
+    private long refreshOutdatedPage(Segment seg, int grpId, long pageId, 
boolean rmv) {
         assert seg.writeLock().isHeldByCurrentThread();
 
-        int tag = seg.partGeneration(cacheId, PageIdUtils.partId(pageId));
+        int tag = seg.partGeneration(grpId, PageIdUtils.partId(pageId));
 
-        long relPtr = seg.loadedPages.refresh(cacheId, 
PageIdUtils.effectivePageId(pageId), tag);
+        long relPtr = seg.loadedPages.refresh(grpId, 
PageIdUtils.effectivePageId(pageId), tag);
 
         long absPtr = seg.absolute(relPtr);
 
@@ -815,13 +813,13 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
 
         if (rmv)
-            seg.loadedPages.remove(cacheId, 
PageIdUtils.effectivePageId(pageId));
+            seg.loadedPages.remove(grpId, PageIdUtils.effectivePageId(pageId));
 
         if (seg.segCheckpointPages != null)
-            seg.segCheckpointPages.remove(new FullPageId(pageId, cacheId));
+            seg.segCheckpointPages.remove(new FullPageId(pageId, grpId));
 
         if (seg.dirtyPages != null)
-            seg.dirtyPages.remove(new FullPageId(pageId, cacheId));
+            seg.dirtyPages.remove(new FullPageId(pageId, grpId));
 
         return relPtr;
     }
@@ -991,7 +989,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         memMetrics.resetDirtyPages();
 
-        if (isThrottlingEnabled())
+        if (throttlingPlc != ThrottlingPolicy.DISABLED)
             writeThrottle.onBeginCheckpoint();
 
         return new GridMultiCollectionWrapper<>(collections);
@@ -1001,7 +999,7 @@ public class PageMemoryImpl implements PageMemoryEx {
      * @return {@code True} if throttling is enabled.
      */
     private boolean isThrottlingEnabled() {
-        return throttlingPlc != ThrottlingPolicy.NONE;
+        return throttlingPlc != ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY && 
throttlingPlc != ThrottlingPolicy.DISABLED;
     }
 
     /** {@inheritDoc} */
@@ -1013,7 +1011,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         for (Segment seg : segments)
             seg.segCheckpointPages = null;
 
-        if (isThrottlingEnabled())
+        if (throttlingPlc != ThrottlingPolicy.DISABLED)
             writeThrottle.onFinishCheckpoint();
     }
 
@@ -1340,8 +1338,8 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public long readLockForce(int cacheId, long pageId, long page) {
-        return readLockPage(page, new FullPageId(pageId, cacheId), true);
+    @Override public long readLockForce(int grpId, long pageId, long page) {
+        return readLockPage(page, new FullPageId(pageId, grpId), true);
     }
 
     /**
@@ -1400,9 +1398,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             if (tmpRelPtr == INVALID_REL_PTR) {
                 rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, 
OffheapReadWriteLock.TAG_LOCK_ALWAYS);
 
-                throw new IgniteException(
-                    "Failed to allocate temporary buffer for checkpoint " +
-                        "(increase checkpointPageBufferSize configuration 
property)");
+                throw new IgniteException(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG + 
": " + memMetrics.getName());
             }
 
             // Pin the page until checkpoint is not finished.
@@ -1472,7 +1468,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         try {
             rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, 
PageIdUtils.tag(pageId));
 
-            if (isThrottlingEnabled() && !restore && markDirty && !wasDirty)
+            if (throttlingPlc != ThrottlingPolicy.DISABLED && !restore && 
markDirty && !wasDirty)
                 writeThrottle.onMarkDirty(isInCheckpoint(fullId));
         }
         catch (AssertionError ex) {
@@ -1847,7 +1843,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         /** Initial partition generation. */
         private static final int INIT_PART_GENERATION = 1;
 
-        /** Maps partition (cacheId, partId) to its generation. Generation is 
1-based incrementing partition counter. */
+        /** Maps partition (grpId, partId) to its generation. Generation is 
1-based incrementing partition counter. */
         private final Map<GroupPartitionId, Integer> partGenerationMap = new 
HashMap<>();
 
         /** */
@@ -1882,7 +1878,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             pool = new PagePool(idx, poolRegion, null);
 
-            maxDirtyPages = throttlingPlc != ThrottlingPolicy.NONE
+            maxDirtyPages = throttlingPlc != ThrottlingPolicy.DISABLED
                 ? pool.pages() * 3 / 4
                 : Math.min(pool.pages() * 2 / 3, cpPoolPages);
         }
@@ -2599,37 +2595,37 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
 
         /**
-         * Reads cache ID from the page at the given absolute pointer.
+         * Reads cache group ID from the page at the given absolute pointer.
          *
          * @param absPtr Absolute memory pointer to the page header.
-         * @return Cache ID written to the page.
+         * @return Cache group ID written to the page.
          */
-        private static int readPageCacheId(final long absPtr) {
+        private static int readPageGroupId(final long absPtr) {
             return GridUnsafe.getInt(absPtr + PAGE_CACHE_ID_OFFSET);
         }
 
         /**
-         * Writes cache ID from the page at the given absolute pointer.
+         * Writes cache group ID from the page at the given absolute pointer.
          *
          * @param absPtr Absolute memory pointer to the page header.
-         * @param cacheId Cache ID to write.
+         * @param grpId Cache group ID to write.
          */
-        private static void pageCacheId(final long absPtr, final int cacheId) {
-            GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, cacheId);
+        private static void pageGroupId(final long absPtr, final int grpId) {
+            GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, grpId);
         }
 
         /**
-         * Reads page ID and cache ID from the page at the given absolute 
pointer.
+         * Reads page ID and cache group ID from the page at the given 
absolute pointer.
          *
          * @param absPtr Absolute memory pointer to the page header.
          * @return Full page ID written to the page.
          */
         private static FullPageId fullPageId(final long absPtr) {
-            return new FullPageId(readPageId(absPtr), readPageCacheId(absPtr));
+            return new FullPageId(readPageId(absPtr), readPageGroupId(absPtr));
         }
 
         /**
-         * Writes page ID and cache ID from the page at the given absolute 
pointer.
+         * Writes page ID and cache group ID from the page at the given 
absolute pointer.
          *
          * @param absPtr Absolute memory pointer to the page header.
          * @param fullPageId Full page ID to write.
@@ -2637,7 +2633,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         private static void fullPageId(final long absPtr, final FullPageId 
fullPageId) {
             pageId(absPtr, fullPageId.pageId());
 
-            pageCacheId(absPtr, fullPageId.groupId());
+            pageGroupId(absPtr, fullPageId.groupId());
         }
     }
 
@@ -2734,8 +2730,13 @@ public class PageMemoryImpl implements PageMemoryEx {
      * Throttling enabled and its type enum.
      */
     public enum ThrottlingPolicy {
-        /** Not throttled. */NONE,
-        /** Target ratio based: CP progress is used as border. */ 
TARGET_RATIO_BASED,
-        /** Speed based. CP writting speed and estimated ideal speed are used 
as border */ SPEED_BASED
+        /** All ways of throttling are disabled. */
+        DISABLED,
+        /** Only exponential throttling is used to protect from CP buffer 
overflow. */
+        CHECKPOINT_BUFFER_ONLY,
+        /** Target ratio based: CP progress is used as border. */
+        TARGET_RATIO_BASED,
+        /** Speed based. CP writting speed and estimated ideal speed are used 
as border */
+        SPEED_BASED
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
index aaf5471..68fa529 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -196,21 +196,23 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
                         markDirtySpeed,
                         curCpWriteSpeed);
 
-                    level = ThrottleMode.LIMITED;
+                    level = throttleParkTimeNs == 0 ? ThrottleMode.NO : 
ThrottleMode.LIMITED;
                 }
             }
         }
 
-        if (level == ThrottleMode.NO) {
-            exponentialBackoffCntr.set(0);
-
-            throttleParkTimeNs = 0;
-        }
-        else if (level == ThrottleMode.EXPONENTIAL) {
+        if (level == ThrottleMode.EXPONENTIAL) {
             int exponent = exponentialBackoffCntr.getAndIncrement();
 
             throttleParkTimeNs = (long)(STARTING_THROTTLE_NANOS * 
Math.pow(BACKOFF_RATIO, exponent));
         }
+        else {
+            if (isPageInCheckpoint)
+                exponentialBackoffCntr.set(0);
+
+            if (level == ThrottleMode.NO)
+                throttleParkTimeNs = 0;
+        }
 
         if (throttleParkTimeNs > 0) {
             recurrentLogIfNeed();

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index 78e5344..166cdcd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -32,6 +32,9 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
     /** Database manager. */
     private final CheckpointWriteProgressSupplier cpProgress;
 
+    /** If true, throttle will only protect from checkpoint buffer overflow, 
not from dirty pages ratio cap excess. */
+    private final boolean throttleOnlyPagesInCheckpoint;
+
     /** Checkpoint lock state checker. */
     private CheckpointLockStateChecker stateChecker;
 
@@ -41,30 +44,36 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
     /** Backoff ratio. Each next park will be this times longer. */
     private static final double BACKOFF_RATIO = 1.05;
 
-    /** Exponential backoff counter. */
-    private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0);
+    /** Counter for dirty pages ratio throttling. */
+    private final AtomicInteger notInCheckpointBackoffCntr = new 
AtomicInteger(0);
+
+    /** Counter for checkpoint buffer usage ratio throttling (we need a 
separate one due to IGNITE-7751). */
+    private final AtomicInteger inCheckpointBackoffCntr = new AtomicInteger(0);
+
     /**
      * @param pageMemory Page memory.
      * @param cpProgress Database manager.
      * @param stateChecker checkpoint lock state checker.
+     * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
      */
     public PagesWriteThrottle(PageMemoryImpl pageMemory,
         CheckpointWriteProgressSupplier cpProgress,
-        CheckpointLockStateChecker stateChecker) {
+        CheckpointLockStateChecker stateChecker,
+        boolean throttleOnlyPagesInCheckpoint
+    ) {
         this.pageMemory = pageMemory;
         this.cpProgress = cpProgress;
         this.stateChecker = stateChecker;
+        this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+
+        if (!throttleOnlyPagesInCheckpoint)
+            assert cpProgress != null : "cpProgress must be not null if ratio 
based throttling mode is used";
     }
 
     /** {@inheritDoc} */
     @Override public void onMarkDirty(boolean isPageInCheckpoint) {
         assert stateChecker.checkpointLockIsHeldByThread();
 
-        AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
-
-        if (writtenPagesCntr == null)
-            return; // Don't throttle if checkpoint is not running.
-
         boolean shouldThrottle = false;
 
         if (isPageInCheckpoint) {
@@ -73,7 +82,12 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
             shouldThrottle = pageMemory.checkpointBufferPagesCount() > 
checkpointBufLimit;
         }
 
-        if (!shouldThrottle) {
+        if (!shouldThrottle && !throttleOnlyPagesInCheckpoint) {
+            AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
+
+            if (writtenPagesCntr == null)
+                return; // Don't throttle if checkpoint is not running.
+
             int cpWrittenPages = writtenPagesCntr.get();
 
             int cpTotalPages = cpProgress.currentCheckpointPagesCount();
@@ -92,13 +106,15 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
             }
         }
 
+        AtomicInteger cntr = isPageInCheckpoint ? inCheckpointBackoffCntr : 
notInCheckpointBackoffCntr;
+
         if (shouldThrottle) {
-            int throttleLevel = exponentialBackoffCntr.getAndIncrement();
+            int throttleLevel = cntr.getAndIncrement();
 
             LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * 
Math.pow(BACKOFF_RATIO, throttleLevel)));
         }
         else
-            exponentialBackoffCntr.set(0);
+            cntr.set(0);
     }
 
     /** {@inheritDoc} */
@@ -107,6 +123,8 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
 
     /** {@inheritDoc} */
     @Override public void onFinishCheckpoint() {
-        exponentialBackoffCntr.set(0);
+        inCheckpointBackoffCntr.set(0);
+
+        notInCheckpointBackoffCntr.set(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index a305b7f..3737204 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -76,7 +76,7 @@ public class BPlusTreePageMemoryImplTest extends 
BPlusTreeSelfTest {
             },
             () -> true,
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE,
+            PageMemoryImpl.ThrottlingPolicy.DISABLED,
             null
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 1fc34c5..0c786ad 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -23,7 +23,6 @@ import 
org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.database.BPlusTreeReuseSelfTest;
@@ -77,7 +76,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends 
BPlusTreeReuseSelfTest
             },
             () -> true,
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE,
+            PageMemoryImpl.ThrottlingPolicy.DISABLED,
             null
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
index a87a61e..c6f42e1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
@@ -223,9 +223,8 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
 
         DirectMemoryProvider provider = new UnsafeMemoryProvider(log);
 
-        PageMemoryImpl memory
-            = new PageMemoryImpl(provider, sizes, sctx, pageSize,
-            pageWriter, null, () -> true, memMetrics, null, null);
+        PageMemoryImpl memory = new PageMemoryImpl(provider, sizes, sctx, 
pageSize,
+            pageWriter, null, () -> true, memMetrics, 
PageMemoryImpl.ThrottlingPolicy.DISABLED, null);
 
         memory.start();
         return memory;

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 1cef087..f9ca7e4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -23,7 +23,6 @@ import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteLogger;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
-import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.logger.NullLogger;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
index 4495dc1..9087b1c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
@@ -24,7 +24,6 @@ import 
org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.database.IndexStorageSelfTest;
@@ -92,7 +91,7 @@ public class IndexStoragePageMemoryImplTest extends 
IndexStorageSelfTest {
             },
             () -> true,
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE,
+            PageMemoryImpl.ThrottlingPolicy.DISABLED,
             null
         );
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 3c169be..34fd93b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -86,7 +86,8 @@ public class PageMemoryImplNoLoadTest extends 
PageMemoryNoLoadSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE, null
+            PageMemoryImpl.ThrottlingPolicy.DISABLED,
+            null
         );
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05d58bb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 8f0ef39..31af118 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -17,24 +17,40 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.mem.DirectMemoryProvider;
 import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
 import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.mockito.Mockito;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.CHECKPOINT_POOL_OVERFLOW_ERROR_MSG;
 
 /**
  *
@@ -46,11 +62,14 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
     /** Page size. */
     private static final int PAGE_SIZE = 1024;
 
+    /** Max memory size. */
+    private static final int MAX_SIZE = 128;
+
     /**
      * @throws Exception if failed.
      */
     public void testThatAllocationTooMuchPagesCauseToOOMException() throws 
Exception {
-        PageMemoryImpl memory = createPageMemory();
+        PageMemoryImpl memory = 
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
 
         try {
             while (!Thread.currentThread().isInterrupted())
@@ -64,15 +83,186 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     *
+     * @throws Exception If failed.
+     */
+    public void testCheckpointBufferOverusageDontCauseWriteLockLeak() throws 
Exception {
+        PageMemoryImpl memory = 
createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
+
+        List<FullPageId> pages = new ArrayList<>();
+
+        try {
+            while (!Thread.currentThread().isInterrupted()) {
+                long pageId = memory.allocatePage(1, 
PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+
+                FullPageId fullPageId = new FullPageId(pageId, 1);
+
+                pages.add(fullPageId);
+
+                acquireAndReleaseWriteLock(memory, fullPageId); //to set page 
id, otherwise we would fail with assertion error
+            }
+        }
+        catch (IgniteOutOfMemoryException ignore) {
+            //Success
+        }
+
+        memory.beginCheckpoint();
+
+        final AtomicReference<FullPageId> lastPage = new AtomicReference<>();
+
+        try {
+            for (FullPageId fullPageId : pages) {
+                lastPage.set(fullPageId);
+
+                acquireAndReleaseWriteLock(memory, fullPageId);
+            }
+        }
+        catch (Exception ex) {
+            
assertTrue(ex.getMessage().startsWith(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG));
+        }
+
+        memory.finishCheckpoint();
+
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    acquireAndReleaseWriteLock(memory, lastPage.get()); //we 
should be able get lock again
+                }
+                catch (IgniteCheckedException e) {
+                    throw new AssertionError(e);
+                }
+            }
+        }).get(getTestTimeout());
+    }
+
+    /**
+     * Tests that checkpoint buffer won't be overflowed with enabled 
CHECKPOINT_BUFFER_ONLY throttling.
+     * @throws Exception If failed.
      */
-    private PageMemoryImpl createPageMemory() throws Exception {
+    public void testCheckpointBufferCantOverflowMixedLoad() throws Exception {
+        
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY);
+    }
+
+    /**
+     * Tests that checkpoint buffer won't be overflowed with enabled 
SPEED_BASED throttling.
+     * @throws Exception If failed.
+     */
+    public void testCheckpointBufferCantOverflowMixedLoadSpeedBased() throws 
Exception {
+        
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.SPEED_BASED);
+    }
+
+    /**
+     * Tests that checkpoint buffer won't be overflowed with enabled 
TARGET_RATIO_BASED throttling.
+     * @throws Exception If failed.
+     */
+    public void testCheckpointBufferCantOverflowMixedLoadRatioBased() throws 
Exception {
+        
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void 
testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy
 plc) throws Exception {
+        PageMemoryImpl memory = createPageMemory(plc);
+
+        List<FullPageId> pages = new ArrayList<>();
+
+        for (int i = 0; i < (MAX_SIZE - 10) * MB / PAGE_SIZE / 2; i++) {
+            long pageId = memory.allocatePage(1, 
PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+
+            FullPageId fullPageId = new FullPageId(pageId, 1);
+
+            pages.add(fullPageId);
+
+            acquireAndReleaseWriteLock(memory, fullPageId);
+        }
+
+        memory.beginCheckpoint();
+
+        CheckpointMetricsTracker mockTracker = 
Mockito.mock(CheckpointMetricsTracker.class);
+
+        for (FullPageId checkpointPage : pages)
+            memory.getForCheckpoint(checkpointPage, 
ByteBuffer.allocate(PAGE_SIZE), mockTracker);
+
+        memory.finishCheckpoint();
+
+        for (int i = (int)((MAX_SIZE - 10) * MB / PAGE_SIZE / 2); i < 
(MAX_SIZE - 20) * MB / PAGE_SIZE; i++) {
+            long pageId = memory.allocatePage(1, 
PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+
+            FullPageId fullPageId = new FullPageId(pageId, 1);
+
+            pages.add(fullPageId);
+
+            acquireAndReleaseWriteLock(memory, fullPageId);
+        }
+
+        memory.beginCheckpoint();
+
+        Collections.shuffle(pages); // Mix pages in checkpoint with clean pages
+
+        AtomicBoolean stop = new AtomicBoolean(false);
+
+        try {
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    for (FullPageId page : pages) {
+                        if (ThreadLocalRandom.current().nextDouble() < 0.5) // 
Mark dirty 50% of pages
+                            try {
+                                acquireAndReleaseWriteLock(memory, page);
+
+                                if (stop.get())
+                                    break;
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.error("runAsync ended with exception", e);
+
+                                fail();
+                            }
+                    }
+                }
+            }).get(5_000);
+        }
+        catch (IgniteFutureTimeoutCheckedException ex) {
+            // Expected.
+        }
+        finally {
+            stop.set(true);
+        }
+
+        memory.finishCheckpoint();
+    }
+
+    /**
+     * @param memory Memory.
+     * @param fullPageId Full page id.
+     * @throws IgniteCheckedException If acquiring lock failed.
+     */
+    private void acquireAndReleaseWriteLock(PageMemoryImpl memory, FullPageId 
fullPageId) throws IgniteCheckedException {
+        long page = memory.acquirePage(1, fullPageId.pageId());
+
+        long address = memory.writeLock(1, fullPageId.pageId(), page);
+
+        PageIO.setPageId(address, fullPageId.pageId());
+
+        PageIO.setType(address, PageIO.T_BPLUS_META);
+
+        PageUtils.putShort(address, PageIO.VER_OFF, (short)1);
+
+        memory.writeUnlock(1, fullPageId.pageId(), page, Boolean.FALSE, true);
+
+        memory.releasePage(1, fullPageId.pageId(), page);
+    }
+
+    /**
+     * @param throttlingPlc Throttling Policy.
+     * @throws Exception If creating mock failed.
+     */
+    private PageMemoryImpl createPageMemory(PageMemoryImpl.ThrottlingPolicy 
throttlingPlc) throws Exception {
         long[] sizes = new long[5];
 
         for (int i = 0; i < sizes.length; i++)
-            sizes[i] = 1024 * MB / 4;
+            sizes[i] = MAX_SIZE * MB / 4;
 
-        sizes[4] = 10 * MB;
+        sizes[4] = 5 * MB;
 
         DirectMemoryProvider provider = new UnsafeMemoryProvider(log);
 
@@ -101,6 +291,13 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             null
         );
 
+        CheckpointWriteProgressSupplier noThrottle = 
Mockito.mock(CheckpointWriteProgressSupplier.class);
+
+        
Mockito.when(noThrottle.currentCheckpointPagesCount()).thenReturn(1_000_000);
+        Mockito.when(noThrottle.evictedPagesCntr()).thenReturn(new 
AtomicInteger(0));
+        Mockito.when(noThrottle.syncedPagesCounter()).thenReturn(new 
AtomicInteger(1_000_000));
+        Mockito.when(noThrottle.writtenPagesCounter()).thenReturn(new 
AtomicInteger(1_000_000));
+
         PageMemoryImpl mem = new PageMemoryImpl(
             provider,
             sizes,
@@ -118,8 +315,8 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
                 }
             },
             new 
DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE,
-            null
+            throttlingPlc,
+            noThrottle
         );
 
         mem.start();

Reply via email to