IGNITE-7606 Write replaced page outside segment write lock - Fixes #3469.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b2f8cf8f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b2f8cf8f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b2f8cf8f

Branch: refs/heads/ignite-7485-2
Commit: b2f8cf8f436e544911b110ad6e2643329af99d4f
Parents: b253156
Author: dpavlov <dpav...@gridgain.com>
Authored: Thu Feb 8 18:14:00 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Thu Feb 8 21:46:26 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   6 +
 .../internal/pagemem/PageIdAllocator.java       |   3 +-
 .../CheckpointWriteProgressSupplier.java        |  45 +++
 .../GridCacheDatabaseSharedManager.java         |  53 ++--
 .../pagemem/DelayedDirtyPageWrite.java          | 105 +++++++
 .../pagemem/DelayedPageReplacementTracker.java  | 198 ++++++++++++
 .../persistence/pagemem/PageMemoryImpl.java     | 148 +++++----
 .../pagemem/PagesWriteSpeedBasedThrottle.java   |  30 +-
 .../persistence/pagemem/PagesWriteThrottle.java |  24 +-
 .../persistence/pagemem/ReplacedPageWriter.java |  35 +++
 .../checkpoint/IgniteMassLoadSandboxTest.java   |   4 +-
 .../pagemem/BPlusTreePageMemoryImplTest.java    |  19 +-
 .../BPlusTreeReuseListPageMemoryImplTest.java   |  18 +-
 ...gnitePageMemReplaceDelayedWriteUnitTest.java | 307 +++++++++++++++++++
 .../pagemem/IgniteThrottlingUnitTest.java       |  22 +-
 .../pagemem/IndexStoragePageMemoryImplTest.java |  18 +-
 .../pagemem/PageMemoryImplNoLoadTest.java       |  12 +-
 .../persistence/pagemem/PageMemoryImplTest.java |  13 +-
 .../testsuites/IgnitePdsUnitTestSuite.java      |   4 +-
 19 files changed, 891 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 2b221a1..8e72099 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -811,6 +811,12 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_DEV_ONLY_LOGGING_DISABLED = 
"IGNITE_DEV_ONLY_LOGGING_DISABLED";
 
     /**
+     * When set to {@code true} (default), pages are written to page store 
without holding segment lock (with delay).
+     * Because other thread may require exactly the same page to be loaded 
from store, reads are protected by locking.
+     */
+    public static final String IGNITE_DELAYED_REPLACED_PAGE_WRITE = 
"IGNITE_DELAYED_REPLACED_PAGE_WRITE";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
index dc504fe..ae7da14 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
@@ -38,6 +38,7 @@ public interface PageIdAllocator {
     /**
      * Allocates a page from the space for the given partition ID and the 
given flags.
      *
+     * @param cacheId Cache Group ID.
      * @param partId Partition ID.
      * @return Allocated page ID.
      */
@@ -46,7 +47,7 @@ public interface PageIdAllocator {
     /**
      * The given page is free now.
      *
-     * @param cacheId Cache ID.
+     * @param cacheId Cache Group ID.
      * @param pageId Page ID.
      */
     public boolean freePage(int cacheId, long pageId) throws 
IgniteCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java
new file mode 100644
index 0000000..d20e8d4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Provider for counters of checkpoint writes progress.
+ */
+public interface CheckpointWriteProgressSupplier {
+    /**
+     * Counter for written checkpoint pages. Not null only if checkpoint is 
running.
+     */
+    public AtomicInteger writtenPagesCounter();
+
+    /**
+     * @return Counter for fsynced checkpoint pages. Not null only if 
checkpoint is running.
+     */
+    public AtomicInteger syncedPagesCounter();
+
+    /**
+     * @return Counter for evicted pages during current checkpoint. Not null 
only if checkpoint is running.
+     */
+    public AtomicInteger evictedPagesCntr();
+
+    /**
+     * @return Number of pages in current checkpoint. If checkpoint is not 
running, returns 0.
+     */
+    public int currentCheckpointPagesCount();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 1de48f6..e53bd44 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -170,7 +170,7 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.metastorag
  *
  */
 @SuppressWarnings({"unchecked", 
"NonPrivateFieldAccessedInSynchronizedContext"})
-public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedManager {
+public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedManager implements CheckpointWriteProgressSupplier {
     /** */
     public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = 
"IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
 
@@ -361,7 +361,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Counter for fsynced checkpoint pages. Not null only if checkpoint is 
running. */
     private volatile AtomicInteger syncedPagesCntr = null;
 
-    /** Counter for evictted checkpoint pages. Not null only if checkpoint is 
running. */
+    /** Counter for evicted checkpoint pages. Not null only if checkpoint is 
running. */
     private volatile AtomicInteger evictedPagesCntr = null;
 
     /** Number of pages in current checkpoint at the beginning of checkpoint. 
*/
@@ -980,28 +980,23 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             ),
             cctx,
             memCfg.getPageSize(),
-            new GridInClosure3X<FullPageId, ByteBuffer, Integer>() {
-                @Override public void applyx(
-                    FullPageId fullId,
-                    ByteBuffer pageBuf,
-                    Integer tag
-                ) throws IgniteCheckedException {
-                    // First of all, write page to disk.
-                    storeMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, 
tag);
+            (fullId, pageBuf, tag) -> {
+                // First of all, write page to disk.
+                storeMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, 
tag);
 
-                    // Only after write we can write page into snapshot.
-                    snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
+                // Only after write we can write page into snapshot.
+                snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
 
-                    AtomicInteger cntr = evictedPagesCntr;
+                AtomicInteger cntr = evictedPagesCntr;
 
-                    if (cntr != null)
-                        cntr.incrementAndGet();
-                }
+                if (cntr != null)
+                    cntr.incrementAndGet();
             },
             changeTracker,
             this,
             memMetrics,
-            plc
+            plc,
+            this
         );
 
         memMetrics.pageMemory(pageMem);
@@ -2627,31 +2622,23 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         }
     }
 
-    /**
-     * Counter for written checkpoint pages. Not null only if checkpoint is 
running.
-     */
-    public AtomicInteger writtenPagesCounter() {
+    /** {@inheritDoc} */
+    @Override public AtomicInteger writtenPagesCounter() {
         return writtenPagesCntr;
     }
 
-    /**
-     * @return Counter for fsynced checkpoint pages. Not null only if 
checkpoint is running.
-     */
-    public AtomicInteger syncedPagesCounter() {
+    /** {@inheritDoc} */
+    @Override public AtomicInteger syncedPagesCounter() {
         return syncedPagesCntr;
     }
 
-    /**
-     * @return Counter for evicted pages during current checkpoint. Not null 
only if checkpoint is running.
-     */
-    public AtomicInteger evictedPagesCntr() {
+    /** {@inheritDoc} */
+    @Override public AtomicInteger evictedPagesCntr() {
         return evictedPagesCntr;
     }
 
-    /**
-     * @return Number of pages in current checkpoint. If checkpoint is not 
running, returns 0.
-     */
-    public int currentCheckpointPagesCount() {
+    /** {@inheritDoc} */
+    @Override public int currentCheckpointPagesCount() {
         return currCheckpointPagesCnt;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
new file mode 100644
index 0000000..6eec609
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Not thread safe and stateful class for replacement of one page with write() 
delay. This allows to write page content
+ * without holding segment lock. Page data is copied into temp buffer during 
{@link #writePage(FullPageId, ByteBuffer,
+ * int)} and then sent to real implementation by {@link #finishReplacement()}.
+ */
+public class DelayedDirtyPageWrite implements ReplacedPageWriter {
+    /** Real flush dirty page implementation. */
+    private final ReplacedPageWriter flushDirtyPage;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** Thread local with byte buffers. */
+    private final ThreadLocal<ByteBuffer> byteBufThreadLoc;
+
+    /** Replacing pages tracker, used to register & unregister pages being 
written. */
+    private final DelayedPageReplacementTracker tracker;
+
+    /** Full page id to be written on {@link #finishReplacement()} or null if 
nothing to write. */
+    @Nullable private FullPageId fullPageId;
+
+    /** Byte buffer with page data to be written on {@link 
#finishReplacement()} or null if nothing to write. */
+    @Nullable private ByteBuffer byteBuf;
+
+    /** Partition update tag to be used in{@link #finishReplacement()} or null 
if -1 to write. */
+    private int tag = -1;
+
+    /**
+     * @param flushDirtyPage real writer to save page to store.
+     * @param byteBufThreadLoc thread local buffers to use for pages copying.
+     * @param pageSize page size.
+     * @param tracker tracker to lock/unlock page reads.
+     */
+    public DelayedDirtyPageWrite(ReplacedPageWriter flushDirtyPage,
+        ThreadLocal<ByteBuffer> byteBufThreadLoc, int pageSize,
+        DelayedPageReplacementTracker tracker) {
+        this.flushDirtyPage = flushDirtyPage;
+        this.pageSize = pageSize;
+        this.byteBufThreadLoc = byteBufThreadLoc;
+        this.tracker = tracker;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writePage(FullPageId fullPageId, ByteBuffer byteBuf, 
int tag) {
+        tracker.lock(fullPageId);
+
+        ByteBuffer tlb = byteBufThreadLoc.get();
+
+        tlb.rewind();
+
+        long writeAddr = GridUnsafe.bufferAddress(tlb);
+        long origBufAddr = GridUnsafe.bufferAddress(byteBuf);
+
+        GridUnsafe.copyMemory(origBufAddr, writeAddr, pageSize);
+
+        this.fullPageId = fullPageId;
+        this.byteBuf = tlb;
+        this.tag = tag;
+    }
+
+    /**
+     * Runs actual write if required. Method is 'no op' if there was no page 
selected for replacement.
+     * @throws IgniteCheckedException if write failed.
+     */
+    public void finishReplacement() throws IgniteCheckedException {
+        if (byteBuf == null && fullPageId == null)
+            return;
+
+        try {
+            flushDirtyPage.writePage(fullPageId, byteBuf, tag);
+        }
+        finally {
+            tracker.unlock(fullPageId);
+
+            fullPageId = null;
+            byteBuf = null;
+            tag = -1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
new file mode 100644
index 0000000..9cf5b77
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.FullPageId;
+
+/**
+ * Delayed page writes tracker. Provides delayed write implementations and 
allows to check if page is actually being
+ * written to page store.
+ */
+public class DelayedPageReplacementTracker {
+    /** Page size. */
+    private final int pageSize;
+
+    /** Flush dirty page real implementation. */
+    private final ReplacedPageWriter flushDirtyPage;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Lock stripes for pages read protection. */
+    private final Stripe[] stripes;
+
+    /** Byte buffer thread local. */
+    private final ThreadLocal<ByteBuffer> byteBufThreadLoc
+        = new ThreadLocal<ByteBuffer>() {
+        @Override protected ByteBuffer initialValue() {
+            ByteBuffer buf = ByteBuffer.allocateDirect(pageSize);
+
+            buf.order(ByteOrder.LITTLE_ENDIAN);
+
+            return buf;
+        }
+    };
+
+    /**
+     * Dirty page write for replacement operations thread local. Because page 
write {@link DelayedDirtyPageWrite} is
+     * stateful and not thread safe, this thread local protects from GC 
pressure on pages replacement.
+     */
+    private final ThreadLocal<DelayedDirtyPageWrite> delayedPageWriteThreadLoc
+        = new ThreadLocal<DelayedDirtyPageWrite>() {
+        @Override protected DelayedDirtyPageWrite initialValue() {
+            return new DelayedDirtyPageWrite(flushDirtyPage, byteBufThreadLoc, 
pageSize,
+                DelayedPageReplacementTracker.this);
+        }
+    };
+
+    /**
+     * @param pageSize Page size.
+     * @param flushDirtyPage Flush dirty page.
+     * @param log Logger.
+     * @param segmentCnt Segments count.
+     */
+    public DelayedPageReplacementTracker(int pageSize, ReplacedPageWriter 
flushDirtyPage,
+        IgniteLogger log, int segmentCnt) {
+        this.pageSize = pageSize;
+        this.flushDirtyPage = flushDirtyPage;
+        this.log = log;
+        stripes = new Stripe[segmentCnt];
+
+        for (int i = 0; i < stripes.length; i++)
+            stripes[i] = new Stripe();
+    }
+
+    /**
+     * @return delayed page write implementation, finish method to be called 
to actually write page.
+     */
+    public DelayedDirtyPageWrite delayedPageWrite() {
+        return delayedPageWriteThreadLoc.get();
+    }
+
+    /**
+     * @param id Full page ID
+     * @return stripe related to current page identifier.
+     */
+    private Stripe stripe(FullPageId id) {
+        int segmentIdx = PageMemoryImpl.segmentIndex(id.groupId(), 
id.pageId(), stripes.length);
+
+        return stripes[segmentIdx];
+    }
+
+    /**
+     * @param id full page ID to lock from read
+     */
+    public void lock(FullPageId id) {
+        stripe(id).lock(id);
+    }
+
+    /**
+     * Method is returned when page is available to be loaded from store, or 
waits for replacement finish.
+     *
+     * @param id full page ID to be loaded from store.
+     */
+    public void waitUnlock(FullPageId id) {
+        stripe(id).waitUnlock(id);
+    }
+
+    /**
+     * @param id full page ID, which write has been finished, it is available 
for reading.
+     */
+    public void unlock(FullPageId id) {
+        stripe(id).unlock(id);
+    }
+
+    /**
+     * Stripe for locking pages from reading from store in parallel with not 
finished write.
+     */
+    private class Stripe {
+        /**
+         * Page IDs which are locked for reading from store. Page content is 
being written right now. guarded by
+         * collection object monitor.
+         */
+        private final Collection<FullPageId> locked = new 
HashSet<>(Runtime.getRuntime().availableProcessors() * 2);
+
+        /**
+         * Has locked pages, flag for fast check if there are some pages, what 
were replaced and is being written. Write
+         * to field is guarded by {@link #locked} monitor.
+         */
+        private volatile boolean hasLockedPages;
+
+        /**
+         * @param id full page ID to lock from read
+         */
+        public void lock(FullPageId id) {
+            synchronized (locked) {
+                hasLockedPages = true;
+
+                boolean add = locked.add(id);
+
+                assert add : "Double locking of page for replacement is not 
possible";
+            }
+        }
+
+        /**
+         * Method is returned when page is available to be loaded from store, 
or waits for replacement finish.
+         *
+         * @param id full page ID to be loaded from store.
+         */
+        public void waitUnlock(FullPageId id) {
+            if (!hasLockedPages)
+                return;
+
+            synchronized (locked) {
+                if (!hasLockedPages)
+                    return;
+
+                while (locked.contains(id)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Found replaced page [" + id + "] which is 
being written to page store, wait for finish replacement");
+
+                    try {
+                        locked.wait();
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteInterruptedException(e);
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param id full page ID, which write has been finished, it is 
available for reading.
+         */
+        public void unlock(FullPageId id) {
+            synchronized (locked) {
+                boolean rmv = locked.remove(id);
+
+                assert rmv : "Unlocking page ID never locked, id " + id;
+
+                if (locked.isEmpty())
+                    hasLockedPages = false;
+
+                locked.notifyAll();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index e4c369d..c9670bc 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -59,8 +59,8 @@ import 
org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
-import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
@@ -85,6 +85,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.TRUE;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
 
 /**
@@ -181,7 +183,7 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Shared context. */
     private final GridCacheSharedContext<?, ?> ctx;
 
-    /** State checker. */
+    /** Checkpoint lock state provider. */
     private final CheckpointLockStateChecker stateChecker;
 
     /** Number of used pages in checkpoint buffer. */
@@ -217,10 +219,17 @@ public class PageMemoryImpl implements PageMemoryEx {
     private OffheapReadWriteLock rwLock;
 
     /** Flush dirty page closure. When possible, will be called by 
evictPage(). */
-    private final GridInClosure3X<FullPageId, ByteBuffer, Integer> 
flushDirtyPage;
+    private final ReplacedPageWriter flushDirtyPage;
 
     /**
-     * Flush dirty page closure. When possible, will be called by evictPage().
+     * Delayed page replacement (rotation with disk) tracker. Because other 
thread may require exactly the same page to be loaded from store,
+     * reads are protected by locking.
+     * {@code Null} if delayed write functionality is disabled.
+     */
+    @Nullable private final DelayedPageReplacementTracker 
delayedPageReplacementTracker;
+
+    /**
+     * Callback invoked to track changes in pages.
      * {@code Null} if page tracking functionality is disabled
      * */
     @Nullable private final GridInClosure3X<Long, FullPageId, PageMemoryEx> 
changeTracker;
@@ -231,35 +240,44 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Write throttle type. */
     private ThrottlingPolicy throttlingPlc;
 
-    /**  */
-    private boolean pageEvictWarned;
+    /** Checkpoint progress provider. Null disables throttling. */
+    @Nullable private final CheckpointWriteProgressSupplier cpProgressProvider;
+
+    /** Flag indicating page replacement started (rotation with disk), 
allocating new page requires freeing old one. */
+    private volatile boolean pageReplacementWarned;
 
     /** */
     private long[] sizes;
 
-    /** */
+    /** Memory metrics to track dirty pages count and page replace rate. */
     private DataRegionMetricsImpl memMetrics;
 
     /**
      * @param directMemoryProvider Memory allocator to use.
+     * @param sizes segments sizes, last is checkpoint pool size.
      * @param ctx Cache shared context.
      * @param pageSize Page size.
-     * @param flushDirtyPage Callback invoked when a dirty page is evicted.
+     * @param flushDirtyPage write callback invoked when a dirty page is 
removed for replacement.
      * @param changeTracker Callback invoked to track changes in pages.
-     * @param throttlingPlc Write throttle enabled and type.
+     * @param stateChecker Checkpoint lock state provider. Used to ensure lock 
is held by thread, which modify pages.
+     * @param memMetrics Memory metrics to track dirty pages count and page 
replace rate.
+     * @param throttlingPlc Write throttle enabled and its type. Null equal to 
none.
+     * @param cpProgressProvider checkpoint progress, base for throttling. 
Null disables throttling.
      */
     public PageMemoryImpl(
         DirectMemoryProvider directMemoryProvider,
         long[] sizes,
         GridCacheSharedContext<?, ?> ctx,
         int pageSize,
-        GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage,
+        ReplacedPageWriter flushDirtyPage,
         @Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> 
changeTracker,
         CheckpointLockStateChecker stateChecker,
         DataRegionMetricsImpl memMetrics,
-        ThrottlingPolicy throttlingPlc
+        @Nullable ThrottlingPolicy throttlingPlc,
+        @Nullable CheckpointWriteProgressSupplier cpProgressProvider
     ) {
         assert ctx != null;
+        assert pageSize > 0;
 
         log = ctx.logger(PageMemoryImpl.class);
 
@@ -267,9 +285,14 @@ public class PageMemoryImpl implements PageMemoryEx {
         this.directMemoryProvider = directMemoryProvider;
         this.sizes = sizes;
         this.flushDirtyPage = flushDirtyPage;
+        delayedPageReplacementTracker =
+            getBoolean(IGNITE_DELAYED_REPLACED_PAGE_WRITE, true)
+                ? new DelayedPageReplacementTracker(pageSize, flushDirtyPage, 
log, sizes.length - 1) :
+                null;
         this.changeTracker = changeTracker;
         this.stateChecker = stateChecker;
-        this.throttlingPlc = throttlingPlc;
+        this.throttlingPlc = throttlingPlc != null ? throttlingPlc : 
ThrottlingPolicy.NONE;
+        this.cpProgressProvider = cpProgressProvider;
 
         storeMgr = ctx.pageStore();
         walMgr = ctx.wal();
@@ -340,21 +363,19 @@ public class PageMemoryImpl implements PageMemoryEx {
      *
      */
     private void initWriteThrottle() {
-        if (!(ctx.database() instanceof GridCacheDatabaseSharedManager)) {
-            log.error("Write throttle can't start. Unexpected class of 
database manager: " +
-                ctx.database().getClass());
+        if (!isThrottlingEnabled())
+            return;
+
+        if (cpProgressProvider == null) {
+            log.error("Write throttle can't start. CP progress provider not 
presented");
 
             throttlingPlc = ThrottlingPolicy.NONE;
         }
 
-        if (isThrottlingEnabled()) {
-            GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)ctx.database();
-
-            if (throttlingPlc == ThrottlingPolicy.SPEED_BASED)
-                writeThrottle = new PagesWriteSpeedBasedThrottle(this, db, 
log);
-            else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
-                writeThrottle = new PagesWriteThrottle(this, db);
-        }
+        if (throttlingPlc == ThrottlingPolicy.SPEED_BASED)
+            writeThrottle = new PagesWriteSpeedBasedThrottle(this, 
cpProgressProvider, stateChecker, log);
+        else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
+            writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, 
stateChecker);
     }
 
     /** {@inheritDoc} */
@@ -435,7 +456,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             flags == PageIdAllocator.FLAG_IDX && partId == 
PageIdAllocator.INDEX_PARTITION :
             "flags = " + flags + ", partId = " + partId;
 
-        assert ctx.database().checkpointLockIsHeldByThread();
+        assert stateChecker.checkpointLockIsHeldByThread();
 
         if (isThrottlingEnabled())
             writeThrottle.onMarkDirty(false);
@@ -449,6 +470,9 @@ public class PageMemoryImpl implements PageMemoryEx {
         // because there is no crc inside them.
         Segment seg = segment(cacheId, pageId);
 
+        DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != 
null
+            ? delayedPageReplacementTracker.delayedPageWrite() : null;
+
         FullPageId fullId = new FullPageId(pageId, cacheId);
 
         seg.writeLock().lock();
@@ -471,7 +495,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 relPtr = seg.borrowOrAllocateFreePage(pageId);
 
             if (relPtr == INVALID_REL_PTR)
-                relPtr = seg.evictPage();
+                relPtr = seg.removePageForReplacement(delayedWriter == null ? 
flushDirtyPage : delayedWriter);
 
             long absPtr = seg.absolute(relPtr);
 
@@ -529,6 +553,9 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
         finally {
             seg.writeLock().unlock();
+
+            if (delayedWriter != null)
+                delayedWriter.finishReplacement();
         }
 
         //we have allocated 'tracking' page, we need to allocate regular one
@@ -619,6 +646,9 @@ public class PageMemoryImpl implements PageMemoryEx {
             seg.readLock().unlock();
         }
 
+        DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != 
null
+            ? delayedPageReplacementTracker.delayedPageWrite() : null;
+
         seg.writeLock().lock();
 
         try {
@@ -637,7 +667,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 relPtr = seg.borrowOrAllocateFreePage(pageId);
 
                 if (relPtr == INVALID_REL_PTR)
-                    relPtr = seg.evictPage();
+                    relPtr = seg.removePageForReplacement(delayedWriter == 
null ? flushDirtyPage : delayedWriter);
 
                 absPtr = seg.absolute(relPtr);
 
@@ -664,6 +694,9 @@ public class PageMemoryImpl implements PageMemoryEx {
                     try {
                         ByteBuffer buf = wrapPointer(pageAddr, pageSize());
 
+                        if (delayedPageReplacementTracker != null)
+                            delayedPageReplacementTracker.waitUnlock(fullId);
+
                         storeMgr.read(cacheId, pageId, buf);
                     }
                     catch (IgniteDataIntegrityViolationException ignore) {
@@ -716,6 +749,9 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
         finally {
             seg.writeLock().unlock();
+
+            if (delayedWriter != null)
+                delayedWriter.finishReplacement();
         }
     }
 
@@ -1508,7 +1544,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         boolean wasDirty = PageHeader.dirty(absPtr, dirty);
 
         if (dirty) {
-            assert ctx.database().checkpointLockIsHeldByThread();
+            assert stateChecker.checkpointLockIsHeldByThread();
 
             if (!wasDirty || forceAdd) {
                 boolean added = segment(pageId.groupId(), 
pageId.pageId()).dirtyPages.add(pageId);
@@ -1890,14 +1926,15 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
 
         /**
-         * Prepares a page for eviction, if needed.
+         * Prepares a page removal for page replacement, if needed.
          *
          * @param fullPageId Candidate page full ID.
          * @param absPtr Absolute pointer of the page to evict.
-         * @return {@code True} if it is ok to evict this page, {@code false} 
if another page should be selected.
+         * @param saveDirtyPage implementation to save dirty page to 
persistent storage.
+         * @return {@code True} if it is ok to replace this page, {@code 
false} if another page should be selected.
          * @throws IgniteCheckedException If failed to write page to the 
underlying store during eviction.
          */
-        private boolean prepareEvict(FullPageId fullPageId, long absPtr) 
throws IgniteCheckedException {
+        private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, 
ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
             assert writeLock().isHeldByCurrentThread();
 
             // Do not evict cache meta pages.
@@ -1919,7 +1956,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                     memMetrics.updatePageReplaceRate(U.currentTimeMillis() - 
PageHeader.readTimestamp(absPtr));
 
-                    flushDirtyPage.applyx(
+                    saveDirtyPage.writePage(
                         fullPageId,
                         wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()),
                         partTag(
@@ -1984,16 +2021,17 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
 
         /**
-         * Evict random oldest page from memory to storage.
+         * Removes random oldest page for page replacement from memory to 
storage.
          *
-         * @return Relative address for evicted page.
+         * @return Relative address for removed page, now it can be replaced 
by allocated or reloaded page.
          * @throws IgniteCheckedException If failed to evict page.
+         * @param saveDirtyPage Replaced page writer, implementation to save 
dirty page to persistent storage.
          */
-        private long evictPage() throws IgniteCheckedException {
+        private long removePageForReplacement(ReplacedPageWriter 
saveDirtyPage) throws IgniteCheckedException {
             assert getWriteHoldCount() > 0;
 
-            if (!pageEvictWarned) {
-                pageEvictWarned = true;
+            if (!pageReplacementWarned) {
+                pageReplacementWarned = true;
 
                 U.warn(log, "Page evictions started, this will affect storage 
performance (consider increasing " +
                     "DataRegionConfiguration#setMaxSize).");
@@ -2022,7 +2060,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             // every time the same page may be found.
             Set<Long> ignored = null;
 
-            long relEvictAddr = INVALID_REL_PTR;
+            long relRmvAddr = INVALID_REL_PTR;
 
             int iterations = 0;
 
@@ -2068,7 +2106,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                     boolean skip = ignored != null && 
ignored.contains(rndAddr);
 
-                    if (relEvictAddr == rndAddr || pinned || skip) {
+                    if (relRmvAddr == rndAddr || pinned || skip) {
                         i--;
 
                         continue;
@@ -2095,33 +2133,30 @@ public class PageMemoryImpl implements PageMemoryEx {
                         metaTs = pageTs;
                     }
 
-                    if (cleanAddr != INVALID_REL_PTR) {
-                        relEvictAddr = cleanAddr;
-                    }
-                    else if (dirtyAddr != INVALID_REL_PTR) {
-                        relEvictAddr = dirtyAddr;
-                    }
-                    else {
-                        relEvictAddr = metaAddr;
-                    }
+                    if (cleanAddr != INVALID_REL_PTR)
+                        relRmvAddr = cleanAddr;
+                    else if (dirtyAddr != INVALID_REL_PTR)
+                        relRmvAddr = dirtyAddr;
+                    else
+                        relRmvAddr = metaAddr;
                 }
 
-                assert relEvictAddr != INVALID_REL_PTR;
+                assert relRmvAddr != INVALID_REL_PTR;
 
-                final long absEvictAddr = absolute(relEvictAddr);
+                final long absRmvAddr = absolute(relRmvAddr);
 
-                final FullPageId fullPageId = 
PageHeader.fullPageId(absEvictAddr);
+                final FullPageId fullPageId = 
PageHeader.fullPageId(absRmvAddr);
 
-                if (!prepareEvict(fullPageId, absEvictAddr)) {
+                if (!preparePageRemoval(fullPageId, absRmvAddr, 
saveDirtyPage)) {
                     if (iterations > 10) {
                         if (ignored == null)
                             ignored = new HashSet<>();
 
-                        ignored.add(relEvictAddr);
+                        ignored.add(relRmvAddr);
                     }
 
                     if (iterations > pool.pages() * FULL_SCAN_THRESHOLD)
-                        return tryToFindSequentially(cap);
+                        return tryToFindSequentially(cap, saveDirtyPage);
 
                     continue;
                 }
@@ -2135,7 +2170,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                     )
                 );
 
-                return relEvictAddr;
+                return relRmvAddr;
             }
         }
 
@@ -2165,8 +2200,9 @@ public class PageMemoryImpl implements PageMemoryEx {
          * Will scan all segment pages to find one to evict it
          *
          * @param cap Capacity.
+         * @param saveDirtyPage Evicted page writer.
          */
-        private long tryToFindSequentially(int cap) throws 
IgniteCheckedException {
+        private long tryToFindSequentially(int cap, ReplacedPageWriter 
saveDirtyPage) throws IgniteCheckedException {
             assert getWriteHoldCount() > 0;
 
             long prevAddr = INVALID_REL_PTR;
@@ -2201,7 +2237,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                 final FullPageId fullPageId = 
PageHeader.fullPageId(absEvictAddr);
 
-                if (prepareEvict(fullPageId, absEvictAddr)) {
+                if (preparePageRemoval(fullPageId, absEvictAddr, 
saveDirtyPage)) {
                     loadedPages.remove(
                         fullPageId.groupId(),
                         PageIdUtils.effectivePageId(fullPageId.pageId()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
index cb19eca..aaf5471 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteLogger;
-import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 
 /**
@@ -37,7 +38,7 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
     private final PageMemoryImpl pageMemory;
 
     /** Database manager. */
-    private final GridCacheDatabaseSharedManager dbSharedMgr;
+    private final CheckpointWriteProgressSupplier cpProgress;
 
     /** Starting throttle time. Limits write speed to 1000 MB/s. */
     private static final long STARTING_THROTTLE_NANOS = 4000;
@@ -91,6 +92,9 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
     /** Total pages which is possible to store in page memory. */
     private long totalPages;
 
+    /** Checkpoint lock state provider. */
+    private CheckpointLockStateChecker cpLockStateChecker;
+
     /** Logger. */
     private IgniteLogger log;
 
@@ -105,22 +109,26 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
 
     /**
      * @param pageMemory Page memory.
-     * @param dbSharedMgr Database manager.
+     * @param cpProgress Database manager.
+     * @param stateChecker Checkpoint lock state provider.
      * @param log Logger.
      */
     public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory,
-        GridCacheDatabaseSharedManager dbSharedMgr, IgniteLogger log) {
+        CheckpointWriteProgressSupplier cpProgress,
+        CheckpointLockStateChecker stateChecker,
+        IgniteLogger log) {
         this.pageMemory = pageMemory;
-        this.dbSharedMgr = dbSharedMgr;
+        this.cpProgress = cpProgress;
         totalPages = pageMemory.totalPages();
+        this.cpLockStateChecker = stateChecker;
         this.log = log;
     }
 
     /** {@inheritDoc} */
     @Override public void onMarkDirty(boolean isPageInCheckpoint) {
-        assert dbSharedMgr.checkpointLockIsHeldByThread();
+        assert cpLockStateChecker.checkpointLockIsHeldByThread();
 
-        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+        AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
 
         if (writtenPagesCntr == null) {
             speedForMarkAll = 0;
@@ -226,7 +234,7 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
      * @return number of written pages.
      */
     private int cpWrittenPages() {
-        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+        AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
 
         return writtenPagesCntr == null ? 0 : writtenPagesCntr.get();
     }
@@ -235,14 +243,14 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
      * @return Number of pages in current checkpoint.
      */
     private int cpTotalPages() {
-        return dbSharedMgr.currentCheckpointPagesCount();
+        return cpProgress.currentCheckpointPagesCount();
     }
 
     /**
      * @return  Counter for fsynced checkpoint pages.
      */
     private int cpSyncedPages() {
-        AtomicInteger syncedPagesCntr = dbSharedMgr.syncedPagesCounter();
+        AtomicInteger syncedPagesCntr = cpProgress.syncedPagesCounter();
 
         return syncedPagesCntr == null ? 0 : syncedPagesCntr.get();
     }
@@ -251,7 +259,7 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
      * @return number of evicted pages.
      */
     private int cpEvictedPages() {
-        AtomicInteger evictedPagesCntr = dbSharedMgr.evictedPagesCntr();
+        AtomicInteger evictedPagesCntr = cpProgress.evictedPagesCntr();
 
         return evictedPagesCntr == null ? 0 : evictedPagesCntr.get();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index 9206935..78e5344 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -18,7 +18,8 @@ package 
org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
-import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 
 /**
  * Throttles threads that generate dirty pages during ongoing checkpoint.
@@ -29,7 +30,10 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
     private final PageMemoryImpl pageMemory;
 
     /** Database manager. */
-    private final GridCacheDatabaseSharedManager dbSharedMgr;
+    private final CheckpointWriteProgressSupplier cpProgress;
+
+    /** Checkpoint lock state checker. */
+    private CheckpointLockStateChecker stateChecker;
 
     /** Starting throttle time. Limits write speed to 1000 MB/s. */
     private static final long STARTING_THROTTLE_NANOS = 4000;
@@ -41,18 +45,22 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
     private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0);
     /**
      * @param pageMemory Page memory.
-     * @param dbSharedMgr Database manager.
+     * @param cpProgress Database manager.
+     * @param stateChecker checkpoint lock state checker.
      */
-    public PagesWriteThrottle(PageMemoryImpl pageMemory, 
GridCacheDatabaseSharedManager dbSharedMgr) {
+    public PagesWriteThrottle(PageMemoryImpl pageMemory,
+        CheckpointWriteProgressSupplier cpProgress,
+        CheckpointLockStateChecker stateChecker) {
         this.pageMemory = pageMemory;
-        this.dbSharedMgr = dbSharedMgr;
+        this.cpProgress = cpProgress;
+        this.stateChecker = stateChecker;
     }
 
     /** {@inheritDoc} */
     @Override public void onMarkDirty(boolean isPageInCheckpoint) {
-        assert dbSharedMgr.checkpointLockIsHeldByThread();
+        assert stateChecker.checkpointLockIsHeldByThread();
 
-        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+        AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
 
         if (writtenPagesCntr == null)
             return; // Don't throttle if checkpoint is not running.
@@ -68,7 +76,7 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
         if (!shouldThrottle) {
             int cpWrittenPages = writtenPagesCntr.get();
 
-            int cpTotalPages = dbSharedMgr.currentCheckpointPagesCount();
+            int cpTotalPages = cpProgress.currentCheckpointPagesCount();
 
             if (cpWrittenPages == cpTotalPages) {
                 // Checkpoint is already in fsync stage, increasing maximum 
ratio of dirty pages to 3/4

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
new file mode 100644
index 0000000..30f9633
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+
+/**
+ * Flush (write) dirty page implementation for freed page during page 
replacement. When possible, will be called by
+ * removePageForReplacement().
+ */
+public interface ReplacedPageWriter {
+    /**
+     * @param fullPageId Full page ID being evicted.
+     * @param byteBuf Buffer with page data.
+     * @param tag partition update tag, increasing counter.
+     * @throws IgniteCheckedException if page write failed.
+     */
+    void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) throws 
IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
index c49f08e..b043fa9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
@@ -110,7 +110,7 @@ public class IgniteMassLoadSandboxTest extends 
GridCommonAbstractTest {
         DataRegionConfiguration regCfg = new DataRegionConfiguration()
             .setName("dfltMemPlc")
             .setMetricsEnabled(true)
-            .setMaxSize(2 * 1024L * 1024 * 1024)
+            .setMaxSize(256L * 1024 * 1024)
             .setPersistenceEnabled(true);
 
         DataStorageConfiguration dsCfg = new DataStorageConfiguration();
@@ -232,6 +232,8 @@ public class IgniteMassLoadSandboxTest extends 
GridCommonAbstractTest {
             // 
System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, 
"true");
             
System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, 
"false");
             
System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED,
 "speed");
+            
System.setProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE, 
"true");
+
 
             setWalArchAndWorkToSameVal = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 1b6fde3..a305b7f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -17,16 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.internal.mem.DirectMemoryProvider;
 import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
-import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.database.BPlusTreeSelfTest;
 import org.apache.ignite.internal.util.typedef.CIX3;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -69,22 +67,17 @@ public class BPlusTreePageMemoryImplTest extends 
BPlusTreeSelfTest {
             provider, sizes,
             sharedCtx,
             PAGE_SIZE,
-            new CIX3<FullPageId, ByteBuffer, Integer>() {
-                @Override public void applyx(FullPageId fullPageId, ByteBuffer 
byteBuf, Integer tag) {
-                    assert false : "No evictions should happen during the 
test";
-                }
+            (fullPageId, byteBuf, tag) -> {
+                assert false : "No page replacement should happen during the 
test";
             },
             new CIX3<Long, FullPageId, PageMemoryEx>(){
                 @Override public void applyx(Long aLong, FullPageId 
fullPageId, PageMemoryEx ex) {
                 }
             },
-            new CheckpointLockStateChecker() {
-                @Override public boolean checkpointLockIsHeldByThread() {
-                    return true;
-                }
-            },
+            () -> true,
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE
+            PageMemoryImpl.ThrottlingPolicy.NONE,
+            null
         );
 
         mem.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 1180164..1fc34c5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.internal.mem.DirectMemoryProvider;
 import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
@@ -25,11 +24,10 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
-import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.database.BPlusTreeReuseSelfTest;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
-import org.apache.ignite.internal.util.typedef.CIX3;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 
 /**
@@ -70,21 +68,17 @@ public class BPlusTreeReuseListPageMemoryImplTest extends 
BPlusTreeReuseSelfTest
             provider, sizes,
             sharedCtx,
             PAGE_SIZE,
-            new CIX3<FullPageId, ByteBuffer, Integer>() {
-                @Override public void applyx(FullPageId fullPageId, ByteBuffer 
byteBuf, Integer tag) {
-                    assert false : "No evictions should happen during the 
test";
-                }
+            (fullPageId, byteBuf, tag) -> {
+                assert false : "No page replacement (rotation with disk) 
should happen during the test";
             },
             new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
                 @Override public void applyx(Long page, FullPageId fullPageId, 
PageMemoryEx pageMem) {
                 }
-            }, new CheckpointLockStateChecker() {
-                @Override public boolean checkpointLockIsHeldByThread() {
-                    return true;
-                }
             },
+            () -> true,
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE
+            PageMemoryImpl.ThrottlingPolicy.NONE,
+            null
         );
 
         mem.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
new file mode 100644
index 0000000..94e30e7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.logger.NullLogger;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for delayed page replacement mode.
+ */
+public class IgnitePageMemReplaceDelayedWriteUnitTest {
+    /** CPU count. */
+    private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+    /** 1 megabyte in bytes */
+    private static final long MB = 1024L * 1024;
+
+    /** Logger. */
+    private IgniteLogger log = new NullLogger();
+
+    /** Page memory, published here for backward call from replacement write 
callback. */
+    private PageMemoryImpl pageMemory;
+
+    /**
+     * Test delayed eviction causes locking in page reads
+     * @throws IgniteCheckedException if failed.
+     */
+    @Test
+    public void testReplacementWithDelayCausesLockForRead() throws 
IgniteCheckedException {
+        IgniteConfiguration cfg = getConfiguration(MB);
+
+        AtomicInteger totalEvicted = new AtomicInteger();
+
+        ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer 
byteBuf, int tag) -> {
+            log.info("Evicting " + fullPageId);
+
+            assert getLockedPages(fullPageId).contains(fullPageId);
+
+            assert !getSegment(fullPageId).writeLock().isHeldByCurrentThread();
+
+            totalEvicted.incrementAndGet();
+        };
+
+        int pageSize = 4096;
+        PageMemoryImpl memory = createPageMemory(cfg, pageWriter, pageSize);
+
+        this.pageMemory = memory;
+
+        long pagesTotal = 
cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().getMaxSize()
 / pageSize;
+        long markDirty = pagesTotal * 2 / 3;
+        for (int i = 0; i < markDirty; i++) {
+            long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
+            long ptr = memory.acquirePage(1, pageId);
+
+            memory.releasePage(1, pageId, ptr);
+        }
+
+        GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint();
+        int cpPages = ids.size();
+        log.info("Started CP with [" + cpPages + "] pages in it, created [" + 
markDirty + "] pages");
+
+        for (int i = 0; i < cpPages; i++) {
+            long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
+            long ptr = memory.acquirePage(1, pageId);
+            memory.releasePage(1, pageId, ptr);
+        }
+
+        List<Collection<FullPageId>> stripes = getAllLockedPages();
+
+        assert !stripes.isEmpty();
+
+        for (Collection<FullPageId> pageIds : stripes) {
+            assert pageIds.isEmpty();
+        }
+
+        assert totalEvicted.get() > 0;
+
+        memory.stop();
+    }
+
+    /**
+     * Test delayed eviction causes locking in page reads
+     * @throws IgniteCheckedException if failed.
+     */
+    @Test
+    public void testBackwardCompatibilityMode() throws IgniteCheckedException {
+        IgniteConfiguration cfg = getConfiguration(MB);
+
+        AtomicInteger totalEvicted = new AtomicInteger();
+
+        ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer 
byteBuf, int tag) -> {
+            log.info("Evicting " + fullPageId);
+
+            assert getSegment(fullPageId).writeLock().isHeldByCurrentThread();
+
+            totalEvicted.incrementAndGet();
+        };
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE, 
"false");
+        int pageSize = 4096;
+        PageMemoryImpl memory;
+
+        try {
+            memory = createPageMemory(cfg, pageWriter, pageSize);
+        }
+        finally {
+            
System.clearProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE);
+        }
+
+        this.pageMemory = memory;
+
+        long pagesTotal = 
cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().getMaxSize()
 / pageSize;
+        long markDirty = pagesTotal * 2 / 3;
+        for (int i = 0; i < markDirty; i++) {
+            long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
+            long ptr = memory.acquirePage(1, pageId);
+
+            memory.releasePage(1, pageId, ptr);
+        }
+
+        GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint();
+        int cpPages = ids.size();
+        log.info("Started CP with [" + cpPages + "] pages in it, created [" + 
markDirty + "] pages");
+
+        for (int i = 0; i < cpPages; i++) {
+            long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA);
+            long ptr = memory.acquirePage(1, pageId);
+            memory.releasePage(1, pageId, ptr);
+        }
+
+        assert totalEvicted.get() > 0;
+
+        memory.stop();
+    }
+
+    /**
+     * @param fullPageId page ID to determine segment for
+     * @return segment related
+     */
+    @SuppressWarnings("TypeMayBeWeakened") private ReentrantReadWriteLock 
getSegment(FullPageId fullPageId) {
+        ReentrantReadWriteLock[] segments = U.field(pageMemory, "segments");
+
+        int idx = PageMemoryImpl.segmentIndex(fullPageId.groupId(), 
fullPageId.pageId(),
+            segments.length);
+
+        return segments[idx];
+    }
+
+    /**
+     * @param cfg configuration
+     * @param pageWriter writer for page replacement.
+     * @param pageSize page size
+     * @return implementation for test
+     */
+    @NotNull
+    private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, 
ReplacedPageWriter pageWriter, int pageSize) {
+        IgniteCacheDatabaseSharedManager db = 
mock(GridCacheDatabaseSharedManager.class);
+
+        when(db.checkpointLockIsHeldByThread()).thenReturn(true);
+
+        GridCacheSharedContext sctx = 
Mockito.mock(GridCacheSharedContext.class);
+
+        when(sctx.pageStore()).thenReturn(new NoOpPageStoreManager());
+        when(sctx.wal()).thenReturn(new NoOpWALManager());
+        when(sctx.database()).thenReturn(db);
+        when(sctx.logger(any(Class.class))).thenReturn(log);
+
+        GridKernalContext kernalCtx = mock(GridKernalContext.class);
+
+        when(kernalCtx.config()).thenReturn(cfg);
+        when(sctx.kernalContext()).thenReturn(kernalCtx);
+
+        DataRegionConfiguration regCfg = 
cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration();
+
+        DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(regCfg);
+
+        long[] sizes = prepareSegmentSizes(regCfg.getMaxSize());
+
+        DirectMemoryProvider provider = new UnsafeMemoryProvider(log);
+
+        PageMemoryImpl memory
+            = new PageMemoryImpl(provider, sizes, sctx, pageSize,
+            pageWriter, null, () -> true, memMetrics, null, null);
+
+        memory.start();
+        return memory;
+    }
+
+    /**
+     * @param overallSize default region size in bytes
+     * @return configuration for test.
+     */
+    @NotNull private IgniteConfiguration getConfiguration(long overallSize) {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(overallSize)));
+
+        return cfg;
+    }
+
+    /**
+     * @return collection with pages locked
+     * @param fullPageId page id to check lock.
+     */
+    private Collection<FullPageId> getLockedPages(FullPageId fullPageId) {
+        Object tracker = delayedReplacementTracker();
+
+        Object[] stripes = U.field(tracker, "stripes");
+
+        int idx = PageMemoryImpl.segmentIndex(fullPageId.groupId(), 
fullPageId.pageId(),
+            stripes.length);
+
+        return U.field(stripes[idx], "locked");
+    }
+
+    /**
+     * @return delayed write tracked from page memory.
+     */
+    @NotNull private Object delayedReplacementTracker() {
+        Object tracker = U.field(pageMemory, "delayedPageReplacementTracker");
+
+        if (tracker == null)
+            throw new IllegalStateException("Delayed replacement is not 
configured");
+
+        return tracker;
+    }
+
+    /**
+     * @return all locked pages stripes underlying collectinos
+     */
+    private List<Collection<FullPageId>> getAllLockedPages() {
+        Object tracker = delayedReplacementTracker();
+
+        Object[] stripes = U.field(tracker, "stripes");
+
+        Stream<Collection<FullPageId>> locked = 
Arrays.asList(stripes).stream().map(stripe ->
+            (Collection<FullPageId>)U.field(stripe, "locked"));
+
+        return locked.collect(Collectors.toList());
+    }
+
+    /**
+     * @param overallSize all regions size
+     * @return segments size, cp pool is latest array element.
+     */
+    private long[] prepareSegmentSizes(long overallSize) {
+        int segments = CPUS;
+        long[] sizes = new long[segments + 1];
+
+        for (int i = 0; i < sizes.length; i++)
+            sizes[i] = overallSize / segments;
+
+        sizes[segments] = overallSize / 100;
+
+        return sizes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
index 054696c..1cef087 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteLogger;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
+import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.logger.NullLogger;
 import org.junit.Test;
@@ -42,6 +44,9 @@ public class IgniteThrottlingUnitTest {
     /** Page memory 2 g. */
     private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class);
 
+    /** State checker. */
+    private CheckpointLockStateChecker stateChecker = () -> true;
+
     {
         when(pageMemory2g.totalPages()).thenReturn((2L * 1024 * 1024 * 1024) / 
4096);
     }
@@ -51,7 +56,7 @@ public class IgniteThrottlingUnitTest {
      */
     @Test
     public void breakInCaseTooFast() {
-        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
         long time = throttle.getParkTime(0.67,
             (362584 + 67064) / 2,
@@ -68,7 +73,7 @@ public class IgniteThrottlingUnitTest {
      */
     @Test
     public void noBreakIfNotFastWrite() {
-        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
         long time = throttle.getParkTime(0.47,
             ((362584 + 67064) / 2),
@@ -150,7 +155,7 @@ public class IgniteThrottlingUnitTest {
      */
     @Test
     public void beginOfCp() {
-        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
         assertTrue(throttle.getParkTime(0.01, 100,400000,
             1,
@@ -177,7 +182,7 @@ public class IgniteThrottlingUnitTest {
      */
     @Test
     public void enforceThrottleAtTheEndOfCp() {
-        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
         long time1 = throttle.getParkTime(0.70, 300000, 400000,
             1, 20200, 23000);
@@ -200,7 +205,7 @@ public class IgniteThrottlingUnitTest {
      */
     @Test
     public void tooMuchPagesMarkedDirty() {
-        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log);
 
        // 363308       350004  348976  10604
         long time = throttle.getParkTime(0.75,
@@ -234,11 +239,10 @@ public class IgniteThrottlingUnitTest {
         }).when(log).info(anyString());
 
         AtomicInteger written = new AtomicInteger();
-        GridCacheDatabaseSharedManager db = 
mock(GridCacheDatabaseSharedManager.class);
-        when(db.checkpointLockIsHeldByThread()).thenReturn(true);
-        when(db.writtenPagesCounter()).thenReturn(written);
+        CheckpointWriteProgressSupplier cpProgress = 
mock(CheckpointWriteProgressSupplier.class);
+        when(cpProgress.writtenPagesCounter()).thenReturn(written);
 
-        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, db, log) {
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, cpProgress, stateChecker, log) {
             @Override protected void doPark(long throttleParkTimeNs) {
                 //do nothing
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
index 04f3bd0..4495dc1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
 import java.io.File;
-import java.nio.ByteBuffer;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.internal.mem.DirectMemoryProvider;
 import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
@@ -26,11 +25,10 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
-import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.database.IndexStorageSelfTest;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
-import org.apache.ignite.internal.util.typedef.CIX3;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 
@@ -85,21 +83,17 @@ public class IndexStoragePageMemoryImplTest extends 
IndexStorageSelfTest {
             provider, sizes,
             sharedCtx,
             PAGE_SIZE,
-            new CIX3<FullPageId, ByteBuffer, Integer>() {
-                @Override public void applyx(FullPageId fullPageId, ByteBuffer 
byteBuf, Integer tag) {
-                    assert false : "No evictions should happen during the 
test";
-                }
+            (fullPageId, byteBuf, tag) -> {
+                assert false : "No page replacement (rotation with disk) 
should happen during the test";
             },
             new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
                 @Override public void applyx(Long page, FullPageId fullId, 
PageMemoryEx pageMem) {
                 }
-            }, new CheckpointLockStateChecker() {
-                @Override public boolean checkpointLockIsHeldByThread() {
-                    return true;
-                }
             },
+            () -> true,
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE
+            PageMemoryImpl.ThrottlingPolicy.NONE,
+            null
         );
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index ed285c5..3c169be 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
 import java.io.File;
-import java.nio.ByteBuffer;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.internal.mem.DirectMemoryProvider;
 import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
@@ -28,9 +27,8 @@ import 
org.apache.ignite.internal.pagemem.impl.PageMemoryNoLoadSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
-import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
-import org.apache.ignite.internal.util.typedef.CIX3;
+import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 
@@ -75,10 +73,8 @@ public class PageMemoryImplNoLoadTest extends 
PageMemoryNoLoadSelfTest {
             sizes,
             sharedCtx,
             PAGE_SIZE,
-            new CIX3<FullPageId, ByteBuffer, Integer>() {
-                @Override public void applyx(FullPageId fullPageId, ByteBuffer 
byteBuffer, Integer tag) {
-                    assert false : "No evictions should happen during the 
test";
-                }
+            (fullPageId, byteBuf, tag) -> {
+                assert false : "No page replacement (rotation with disk) 
should happen during the test";
             },
             new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
                 @Override public void applyx(Long page, FullPageId fullId, 
PageMemoryEx pageMem) {
@@ -90,7 +86,7 @@ public class PageMemoryImplNoLoadTest extends 
PageMemoryNoLoadSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE
+            PageMemoryImpl.ThrottlingPolicy.NONE, null
         );
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 1369f28..8f0ef39 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -28,11 +27,10 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
-import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
-import org.apache.ignite.internal.util.typedef.CIX3;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -108,10 +106,8 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             sizes,
             sharedCtx,
             PAGE_SIZE,
-            new CIX3<FullPageId, ByteBuffer, Integer>() {
-                @Override public void applyx(FullPageId fullPageId, ByteBuffer 
byteBuf, Integer tag) {
-                    assert false : "No evictions should happen during the 
test";
-                }
+            (fullPageId, byteBuf, tag) -> {
+                assert false : "No page replacement (rotation with disk) 
should happen during the test";
             },
             new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
                 @Override public void applyx(Long page, FullPageId fullId, 
PageMemoryEx pageMem) {
@@ -122,7 +118,8 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
                 }
             },
             new 
DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()),
-            PageMemoryImpl.ThrottlingPolicy.NONE
+            PageMemoryImpl.ThrottlingPolicy.NONE,
+            null
         );
 
         mem.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
index a2bfeb3..9906429 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.testsuites;
 
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.IgnitePageMemReplaceDelayedWriteUnitTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.IgniteThrottlingUnitTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -25,7 +26,8 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    IgniteThrottlingUnitTest.class
+    IgniteThrottlingUnitTest.class,
+    IgnitePageMemReplaceDelayedWriteUnitTest.class
 })
 public class IgnitePdsUnitTestSuite {
 }

Reply via email to