Repository: ignite
Updated Branches:
  refs/heads/master 1649c532f -> f12055115


IGNITE-6334 Throttle writing threads during ongoing checkpoint - Fixes #2710.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1205511
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1205511
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1205511

Branch: refs/heads/master
Commit: f120551159fd2eeed8cedc0ca3e3ddc394505737
Parents: 1649c53
Author: Ivan Rakov <ivan.glu...@gmail.com>
Authored: Fri Sep 22 12:40:22 2017 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Sep 22 12:40:22 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   5 +
 .../PersistentStoreConfiguration.java           |  28 +-
 .../GridCacheDatabaseSharedManager.java         |  48 ++-
 .../persistence/pagemem/PageMemoryImpl.java     |  96 +++++-
 .../persistence/pagemem/PagesWriteThrottle.java | 104 ++++++
 .../pagemem/BPlusTreePageMemoryImplTest.java    |   4 +-
 .../BPlusTreeReuseListPageMemoryImplTest.java   |   3 +-
 .../MetadataStoragePageMemoryImplTest.java      |   4 +-
 .../pagemem/PageMemoryImplNoLoadTest.java       |   4 +-
 .../persistence/pagemem/PageMemoryImplTest.java |   4 +-
 .../pagemem/PagesWriteThrottleSandboxTest.java  | 264 +++++++++++++++
 .../pagemem/PagesWriteThrottleSmokeTest.java    | 322 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite.java   |   4 +
 13 files changed, 866 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 5f1839b..39e65e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Properties;
 import javax.net.ssl.HostnameVerifier;
 import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
@@ -726,6 +727,10 @@ public final class IgniteSystemProperties {
      */
     public static final String IGNITE_WAL_LOG_TX_RECORDS = 
"IGNITE_WAL_LOG_TX_RECORDS";
 
+    /** If this property is set, {@link 
PersistentStoreConfiguration#writeThrottlingEnabled} will be overridden to true
+     * independent of initial value in configuration. */
+    public static final String IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED = 
"IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index abca5a5..c44e92d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -84,6 +84,9 @@ public class PersistentStoreConfiguration implements 
Serializable {
     /** Default wal archive directory. */
     public static final String DFLT_WAL_ARCHIVE_PATH = "db/wal/archive";
 
+    /** Default write throttling enabled. */
+    public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false;
+
     /** */
     private String persistenceStorePath;
 
@@ -162,6 +165,11 @@ public class PersistentStoreConfiguration implements 
Serializable {
     private long walAutoArchiveAfterInactivity = -1;
 
     /**
+     * If true, threads that generate dirty pages too fast during ongoing 
checkpoint will be throttled.
+     */
+    private boolean writeThrottlingEnabled = DFLT_WRITE_THROTTLING_ENABLED;
+
+    /**
      * Returns a path the root directory where the Persistent Store will 
persist data and indexes.
      */
     public String getPersistentStorePath() {
@@ -240,7 +248,7 @@ public class PersistentStoreConfiguration implements 
Serializable {
     /**
      * Sets a number of threads to use for the checkpointing purposes.
      *
-     * @param checkpointingThreads Number of checkpointing threads. One thread 
is used by default.
+     * @param checkpointingThreads Number of checkpointing threads. Four 
threads are used by default.
      * @return {@code this} for chaining.
      */
     public PersistentStoreConfiguration setCheckpointingThreads(int 
checkpointingThreads) {
@@ -402,6 +410,24 @@ public class PersistentStoreConfiguration implements 
Serializable {
     }
 
     /**
+     * Gets flag indicating whether write throttling is enabled.
+     */
+    public boolean isWriteThrottlingEnabled() {
+        return writeThrottlingEnabled;
+    }
+
+    /**
+     * Sets flag indicating whether write throttling is enabled.
+     *
+     * @param writeThrottlingEnabled Write throttling enabled flag.
+     */
+    public PersistentStoreConfiguration setWriteThrottlingEnabled(boolean 
writeThrottlingEnabled) {
+        this.writeThrottlingEnabled = writeThrottlingEnabled;
+
+        return this;
+    }
+
+    /**
      * Gets the length of the time interval for rate-based metrics. This 
interval defines a window over which
      * hits will be tracked. Default value is {@link 
#DFLT_RATE_TIME_INTERVAL_MILLIS}.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 5f03d8f..62210dc 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -301,6 +301,12 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** */
     private ObjectName persistenceMetricsMbeanName;
 
+    /** Counter for written checkpoint pages. Not null only if checkpoint is 
running. */
+    private volatile AtomicInteger writtenPagesCntr = null;
+
+    /** Number of pages in current checkpoint. */
+    private volatile int currCheckpointPagesCnt;
+
     /**
      * @param ctx Kernal context.
      */
@@ -667,6 +673,11 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 "Checkpoint page buffer size is too big, setting to an 
adjusted cache size [size="
                     + U.readableSize(cacheSize, false) + ",  memPlc=" + 
plcCfg.getName() + ']');
 
+        boolean writeThrottlingEnabled = 
persistenceCfg.isWriteThrottlingEnabled();
+
+        if 
(IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED,
 false))
+            writeThrottlingEnabled = true;
+
         PageMemoryImpl pageMem = new PageMemoryImpl(
             memProvider,
             calculateFragmentSizes(
@@ -699,7 +710,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 }
             },
             this,
-            memMetrics
+            memMetrics,
+            writeThrottlingEnabled
         );
 
         memMetrics.pageMemory(pageMem);
@@ -942,7 +954,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
-     * Gets the checkpoint read lock. While this lock is held, checkpoint 
thread will not acquiSnapshotWorkerre memory
+     * Gets the checkpoint read lock. While this lock is held, checkpoint 
thread will not acquireSnapshotWorker memory
      * state.
      */
     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
@@ -1906,6 +1918,20 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Counter for written checkpoint pages. Not null only if checkpoint is 
running.
+     */
+    public AtomicInteger writtenPagesCounter() {
+        return writtenPagesCntr;
+    }
+
+    /**
+     * @return Number of pages in current checkpoint. If checkpoint is not 
running, returns 0.
+     */
+    public int currentCheckpointPagesCount() {
+        return currCheckpointPagesCnt;
+    }
+
+    /**
      * @param cpTs Checkpoint timestamp.
      * @param cpId Checkpoint ID.
      * @param type Checkpoint type.
@@ -2038,6 +2064,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                 Checkpoint chp = markCheckpointBegin(tracker);
 
+                currCheckpointPagesCnt = chp.pagesSize;
+
+                writtenPagesCntr = new AtomicInteger();
+
                 boolean interrupted = true;
 
                 try {
@@ -2049,7 +2079,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                             asyncRunner == null ? 1 : 
chp.cpPages.collectionsSize());
 
                         tracker.onPagesWriteStart();
-                        final AtomicInteger writtenPagesCtr = new 
AtomicInteger();
+
                         final int totalPagesToWriteCnt = chp.cpPages.size();
 
                         if (asyncRunner != null) {
@@ -2059,7 +2089,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                                     chp.cpPages.innerCollection(i),
                                     updStores,
                                     doneWriteFut,
-                                    writtenPagesCtr,
                                     totalPagesToWriteCnt
                                 );
 
@@ -2078,7 +2107,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                                 chp.cpPages,
                                 updStores,
                                 doneWriteFut,
-                                writtenPagesCtr,
                                 totalPagesToWriteCnt);
 
                             write.run();
@@ -2402,6 +2430,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         chp.cpEntry.checkpointMark(),
                         null,
                         CheckpointEntryType.END);
+
+                writtenPagesCntr = null;
+
+                currCheckpointPagesCnt = 0;
             }
 
             checkpointHist.onCheckpointFinished(chp);
@@ -2498,9 +2530,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         /** */
         private CountDownFuture doneFut;
 
-        /** Counter for all written pages. May be shared between several 
workers */
-        private AtomicInteger writtenPagesCntr;
-
         /** Total pages to write, counter may be greater than {@link 
#writePageIds} size */
         private final int totalPagesToWrite;
 
@@ -2511,7 +2540,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
          * @param writePageIds Collection of page IDs to write.
          * @param updStores
          * @param doneFut
-         * @param writtenPagesCntr all written pages counter, may be shared 
between several write tasks
          * @param totalPagesToWrite total pages to be written under this 
checkpoint
          */
         private WriteCheckpointPages(
@@ -2519,13 +2547,11 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             final Collection<FullPageId> writePageIds,
             final GridConcurrentHashSet<PageStore> updStores,
             final CountDownFuture doneFut,
-            @NotNull final AtomicInteger writtenPagesCntr,
             final int totalPagesToWrite) {
             this.tracker = tracker;
             this.writePageIds = writePageIds;
             this.updStores = updStores;
             this.doneFut = doneFut;
-            this.writtenPagesCntr = writtenPagesCntr;
             this.totalPagesToWrite = totalPagesToWrite;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index dbb64f8..1da17b5 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -56,6 +57,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
@@ -179,6 +181,9 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** State checker. */
     private final CheckpointLockStateChecker stateChecker;
 
+    /** Number of used pages in checkpoint buffer. */
+    private final AtomicInteger cpBufPagesCntr = new AtomicInteger(0);
+
     /** */
     private ExecutorService asyncRunner = new ThreadPoolExecutor(
         0,
@@ -217,6 +222,12 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Flush dirty page closure. When possible, will be called by 
evictPage(). */
     private final GridInClosure3X<Long, FullPageId, PageMemoryEx> 
changeTracker;
 
+    /** Pages write throttle. */
+    private PagesWriteThrottle writeThrottle;
+
+    /** Write throttle enabled flag. */
+    private boolean throttleEnabled;
+
     /**  */
     private boolean pageEvictWarned;
 
@@ -232,6 +243,7 @@ public class PageMemoryImpl implements PageMemoryEx {
      * @param pageSize Page size.
      * @param flushDirtyPage Callback invoked when a dirty page is evicted.
      * @param changeTracker Callback invoked to track changes in pages.
+     * @param throttleEnabled Write throttle enabled flag.
      */
     public PageMemoryImpl(
         DirectMemoryProvider directMemoryProvider,
@@ -241,7 +253,8 @@ public class PageMemoryImpl implements PageMemoryEx {
         GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage,
         GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker,
         CheckpointLockStateChecker stateChecker,
-        MemoryMetricsImpl memMetrics
+        MemoryMetricsImpl memMetrics,
+        boolean throttleEnabled
     ) {
         assert sharedCtx != null;
 
@@ -253,6 +266,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         this.flushDirtyPage = flushDirtyPage;
         this.changeTracker = changeTracker;
         this.stateChecker = stateChecker;
+        this.throttleEnabled = throttleEnabled;
 
         storeMgr = sharedCtx.pageStore();
         walMgr = sharedCtx.wal();
@@ -290,7 +304,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         DirectMemoryRegion cpReg = regions.get(regs - 1);
 
-        checkpointPool = new PagePool(regs - 1, cpReg);
+        checkpointPool = new PagePool(regs - 1, cpReg, cpBufPagesCntr);
 
         long checkpointBuf = cpReg.size();
 
@@ -305,12 +319,14 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             totalAllocated += reg.size();
 
-            segments[i] = new Segment(i, regions.get(i), 
checkpointPool.pages() / segments.length);
+            segments[i] = new Segment(i, regions.get(i), 
checkpointPool.pages() / segments.length, throttleEnabled);
 
             pages += segments[i].pages();
             totalTblSize += segments[i].tableSize();
         }
 
+        initWriteThrottle();
+
         if (log.isInfoEnabled())
             log.info("Started page memory [memoryAllocated=" + 
U.readableSize(totalAllocated, false) +
                 ", pages=" + pages +
@@ -319,6 +335,21 @@ public class PageMemoryImpl implements PageMemoryEx {
                 ']');
     }
 
+    /**
+     *
+     */
+    private void initWriteThrottle() {
+        if (!(sharedCtx.database() instanceof GridCacheDatabaseSharedManager)) 
{
+            log.error("Write throttle can't start. Unexpected class of 
database manager: " +
+                sharedCtx.database().getClass());
+
+            throttleEnabled = false;
+        }
+
+        if (throttleEnabled)
+            writeThrottle = new PagesWriteThrottle(this, 
(GridCacheDatabaseSharedManager)sharedCtx.database());
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("OverlyStrongTypeCast")
     @Override public void stop() throws IgniteException {
@@ -774,6 +805,18 @@ public class PageMemoryImpl implements PageMemoryEx {
         return true;
     }
 
+    /**
+     * @param dirtyRatioThreshold Throttle threshold.
+     */
+    boolean shouldThrottle(double dirtyRatioThreshold) {
+        for (Segment segment : segments) {
+            if (segment.shouldThrottle(dirtyRatioThreshold))
+                return true;
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() 
throws IgniteException {
         Collection[] collections = new Collection[segments.length];
@@ -799,6 +842,9 @@ public class PageMemoryImpl implements PageMemoryEx {
     @Override public void finishCheckpoint() {
         for (Segment seg : segments)
             seg.segCheckpointPages = null;
+
+        if (throttleEnabled)
+            writeThrottle.onFinishCheckpoint();
     }
 
     /** {@inheritDoc} */
@@ -1219,6 +1265,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         try {
             rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, 
PageIdUtils.tag(pageId));
+
+            if (throttleEnabled && !restore && markDirty && !dirty)
+                writeThrottle.onMarkDirty(isInCheckpoint(fullId));
         }
         catch (AssertionError ex) {
             StringBuilder sb = new StringBuilder(sysPageSize * 2);
@@ -1310,6 +1359,20 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /**
+     * Number of used pages in checkpoint buffer.
+     */
+    public int checkpointBufferPagesCount() {
+        return cpBufPagesCntr.get();
+    }
+
+    /**
+     * Number of used pages in checkpoint buffer.
+     */
+    public int checkpointBufferPagesSize() {
+        return checkpointPool.pages();
+    }
+
+    /**
      * This method must be called in synchronized context.
      *
      * @param absPtr Absolute pointer.
@@ -1385,6 +1448,9 @@ public class PageMemoryImpl implements PageMemoryEx {
         /** Direct memory region. */
         protected final DirectMemoryRegion region;
 
+        /** Pool pages counter. */
+        protected final AtomicInteger pagesCntr;
+
         /** */
         protected long lastAllocatedIdxPtr;
 
@@ -1397,10 +1463,12 @@ public class PageMemoryImpl implements PageMemoryEx {
         /**
          * @param idx Index.
          * @param region Region
+         * @param pagesCntr Pages counter.
          */
-        protected PagePool(int idx, DirectMemoryRegion region) {
+        protected PagePool(int idx, DirectMemoryRegion region, AtomicInteger 
pagesCntr) {
             this.idx = idx;
             this.region = region;
+            this.pagesCntr = pagesCntr;
 
             long base = (region.address() + 7) & ~0x7;
 
@@ -1427,6 +1495,9 @@ public class PageMemoryImpl implements PageMemoryEx {
          * @throws GridOffHeapOutOfMemoryException If failed to allocate new 
free page.
          */
         private long borrowOrAllocateFreePage(long pageId) throws 
GridOffHeapOutOfMemoryException {
+            if (pagesCntr != null)
+                pagesCntr.getAndIncrement();
+
             long relPtr = borrowFreePage();
 
             return relPtr != INVALID_REL_PTR ? relPtr : 
allocateFreePage(pageId);
@@ -1500,6 +1571,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + 
PageHeader.fullPageId(absPtr);
 
+            if (pagesCntr != null)
+                pagesCntr.getAndDecrement();
+
             while (true) {
                 long freePageRelPtrMasked = 
GridUnsafe.getLong(freePageListPtr);
 
@@ -1580,8 +1654,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         /**
          * @param region Memory region.
+         * @param throttlingEnabled Write throttling enabled flag.
          */
-        private Segment(int idx, DirectMemoryRegion region, int cpPoolPages) {
+        private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, 
boolean throttlingEnabled) {
             long totalMemory = region.size();
 
             int pages = (int)(totalMemory / sysPageSize);
@@ -1596,9 +1671,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             DirectMemoryRegion poolRegion = region.slice(memPerTbl + 8);
 
-            pool = new PagePool(idx, poolRegion);
+            pool = new PagePool(idx, poolRegion, null);
 
-            maxDirtyPages = Math.min(pool.pages() * 2 / 3, cpPoolPages);
+            maxDirtyPages = throttlingEnabled ? pool.pages() * 3 / 4 : 
Math.min(pool.pages() * 2 / 3, cpPoolPages);
         }
 
         /**
@@ -1609,6 +1684,13 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
 
         /**
+         * @param dirtyRatioThreshold Throttle threshold.
+         */
+        private boolean shouldThrottle(double dirtyRatioThreshold) {
+            return ((double)dirtyPages.size()) / pages() > dirtyRatioThreshold;
+        }
+
+        /**
          * @return Max number of pages this segment can allocate.
          */
         private int pages() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
new file mode 100644
index 0000000..d0c67c7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -0,0 +1,104 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ */
+public class PagesWriteThrottle {
+    /** Page memory. */
+    private final PageMemoryImpl pageMemory;
+
+    /** Database manager. */
+    private final GridCacheDatabaseSharedManager dbSharedMgr;
+
+    /** Starting throttle time. Limits write speed to 1000 MB/s. */
+    private static final long STARTING_THROTTLE_NANOS = 4000;
+
+    /** Backoff ratio. Each next park will be this times longer. */
+    private static final double BACKOFF_RATIO = 1.05;
+
+    /** Exponential backoff counter. */
+    private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0);
+    /**
+     * @param pageMemory Page memory.
+     * @param dbSharedMgr Database manager.
+     */
+    public PagesWriteThrottle(PageMemoryImpl pageMemory, 
GridCacheDatabaseSharedManager dbSharedMgr) {
+        this.pageMemory = pageMemory;
+        this.dbSharedMgr = dbSharedMgr;
+    }
+
+    /**
+     *
+     */
+    public void onMarkDirty(boolean isInCheckpoint) {
+        assert dbSharedMgr.checkpointLockIsHeldByThread();
+
+        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+
+        if (writtenPagesCntr == null)
+            return; // Don't throttle if checkpoint is not running.
+
+        boolean shouldThrottle = false;
+
+        if (isInCheckpoint) {
+            int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 
2 / 3;
+
+            shouldThrottle = pageMemory.checkpointBufferPagesCount() > 
checkpointBufLimit;
+        }
+
+        if (!shouldThrottle) {
+            int cpWrittenPages = writtenPagesCntr.get();
+
+            int cpTotalPages = dbSharedMgr.currentCheckpointPagesCount();
+
+            if (cpWrittenPages == cpTotalPages) {
+                // Checkpoint is already in fsync stage, increasing maximum 
ratio of dirty pages to 3/4
+                shouldThrottle = pageMemory.shouldThrottle(3.0 / 4);
+            } else {
+                double dirtyRatioThreshold = ((double)cpWrittenPages) / 
cpTotalPages;
+
+                // Starting with 0.05 to avoid throttle right after checkpoint 
start
+                // 7/12 is maximum ratio of dirty pages
+                dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 
/ 12;
+
+                shouldThrottle = 
pageMemory.shouldThrottle(dirtyRatioThreshold);
+            }
+        }
+
+        if (shouldThrottle) {
+            int throttleLevel = exponentialBackoffCntr.getAndIncrement();
+
+            LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * 
Math.pow(BACKOFF_RATIO, throttleLevel)));
+        }
+        else
+            exponentialBackoffCntr.set(0);
+    }
+
+    /**
+     *
+     */
+    public void onFinishCheckpoint() {
+        exponentialBackoffCntr.set(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 6f58782..56d09f8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -82,7 +82,9 @@ public class BPlusTreePageMemoryImplTest extends 
BPlusTreeSelfTest {
                     return true;
                 }
             },
-            new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+            new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+            false
+        );
 
         mem.start();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index b263d4f..39183b2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -82,7 +82,8 @@ public class BPlusTreeReuseListPageMemoryImplTest extends 
BPlusTreeReuseSelfTest
                     return true;
                 }
             },
-            new MemoryMetricsImpl(new MemoryPolicyConfiguration())
+            new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+            false
         );
 
         mem.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
index d9257bd..a427c63 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
@@ -97,6 +97,8 @@ public class MetadataStoragePageMemoryImplTest extends 
MetadataStorageSelfTest{
                     return true;
                 }
             },
-            new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+            new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+            false
+        );
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 1fff1f0..467ede4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -88,7 +88,9 @@ public class PageMemoryImplNoLoadTest extends 
PageMemoryNoLoadSelfTest {
                     return true;
                 }
             },
-            new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+            new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+            false
+        );
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 0366eca..c5997fa 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -110,7 +110,9 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
                     return true;
                 }
             },
-            new MemoryMetricsImpl(new MemoryPolicyConfiguration()));
+            new MemoryMetricsImpl(new MemoryPolicyConfiguration()),
+            false
+        );
 
         mem.start();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
new file mode 100644
index 0000000..409ab84
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java
@@ -0,0 +1,264 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.MemoryMetrics;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test to visualize and debug {@link PagesWriteThrottle}.
+ * Prints puts/gets rate, number of dirty pages, pages written in current 
checkpoint and pages in checkpoint buffer.
+ * Not intended to be part of any test suite.
+ */
+public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache1";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+        discoverySpi.setIpFinder(ipFinder);
+
+        MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+        dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration()
+            .setMaxSize(4000L * 1024 * 1024)
+            .setName("dfltMemPlc")
+            .setMetricsEnabled(true));
+
+        dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        cfg.setMemoryConfiguration(dbCfg);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration();
+
+        ccfg1.setName(CACHE_NAME);
+        ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        
ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64));
+
+        cfg.setCacheConfiguration(ccfg1);
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setWalMode(WALMode.BACKGROUND)
+                .setCheckpointingFrequency(20_000)
+                .setCheckpointingPageBufferSize(1000L * 1000 * 1000)
+                .setWriteThrottlingEnabled(true));
+
+        cfg.setConsistentId(gridName);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 100 * 60 * 1000;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testThrottle() throws Exception {
+        startGrids(1).active(true);
+
+        try {
+            final Ignite ig = ignite(0);
+
+            final int keyCnt = 4_000_000;
+
+            final AtomicBoolean run = new AtomicBoolean(true);
+
+            final HitRateMetrics getRate = new HitRateMetrics(5000, 5);
+
+            GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    while (run.get()) {
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        int key = rnd.nextInt(keyCnt * 2);
+
+                        ignite(0).cache(CACHE_NAME).get(key);
+
+                        getRate.onHit();
+                    }
+
+                    return null;
+                }
+            }, 2, "read-loader");
+
+            final HitRateMetrics putRate = new HitRateMetrics(1000, 5);
+
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    while (run.get()) {
+                        long dirtyPages = 0;
+
+                        for (MemoryMetrics m : ig.memoryMetrics())
+                            if (m.getName().equals("dfltMemPlc"))
+                                dirtyPages = m.getDirtyPages();
+
+                        long cpBufPages = 0;
+
+                        long cpWrittenPages;
+
+                        AtomicInteger cntr = 
((GridCacheDatabaseSharedManager)(((IgniteEx)ignite(0))
+                            
.context().cache().context().database())).writtenPagesCounter();
+
+                        cpWrittenPages = cntr == null ? 0 : cntr.get();
+
+                        try {
+                            cpBufPages = 
((PageMemoryImpl)((IgniteEx)ignite(0)).context().cache().context().database()
+                                
.memoryPolicy("dfltMemPlc").pageMemory()).checkpointBufferPagesCount();
+                        }
+                        catch (IgniteCheckedException e) {
+                            e.printStackTrace();
+                        }
+
+                        System.out.println("@@@ putsPerSec=," + 
(putRate.getRate()) + ", getsPerSec=," + (getRate.getRate())  + ", 
dirtyPages=," + dirtyPages + ", cpWrittenPages=," + cpWrittenPages +", 
cpBufPages=," + cpBufPages);
+
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            }, "metrics-view");
+
+            try (IgniteDataStreamer<Object, Object> ds = 
ig.dataStreamer(CACHE_NAME)) {
+                ds.allowOverwrite(true);
+
+                for (int i = 0; i < keyCnt * 10; i++) {
+                    ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), 
new TestValue(ThreadLocalRandom.current().nextInt(),
+                        ThreadLocalRandom.current().nextInt()));
+
+                    putRate.onHit();
+                }
+            }
+
+            run.set(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestValue implements Serializable {
+        /** */
+        private final int v1;
+
+        /** */
+        private final int v2;
+
+        /** */
+        private byte[] payload = new byte[400 + 
ThreadLocalRandom.current().nextInt(20)];
+
+        /**
+         * @param v1 Value 1.
+         * @param v2 Value 2.
+         */
+        private TestValue(int v1, int v2) {
+            this.v1 = v1;
+            this.v2 = v2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue val = (TestValue)o;
+
+            return v1 == val.v1 && v2 == val.v2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = v1;
+
+            res = 31 * res + v2;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
"db", false));
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
"snapshot", false));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
new file mode 100644
index 0000000..12a601d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -0,0 +1,322 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Slow checkpoint enabled. */
+    private final AtomicBoolean slowCheckpointEnabled = new 
AtomicBoolean(true);
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache1";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+        discoverySpi.setIpFinder(ipFinder);
+
+        MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+        dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration()
+            .setMaxSize(400 * 1024 * 1024)
+            .setName("dfltMemPlc")
+            .setMetricsEnabled(true));
+
+        dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        cfg.setMemoryConfiguration(dbCfg);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration();
+
+        ccfg1.setName(CACHE_NAME);
+        ccfg1.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        
ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+        ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64));
+
+        cfg.setCacheConfiguration(ccfg1);
+
+        cfg.setPersistentStoreConfiguration(
+            new PersistentStoreConfiguration()
+                .setWalMode(WALMode.BACKGROUND)
+                .setCheckpointingFrequency(20_000)
+                .setCheckpointingPageBufferSize(200 * 1000 * 1000)
+                .setWriteThrottlingEnabled(true)
+                .setCheckpointingThreads(1)
+                .setFileIOFactory(new SlowCheckpointFileIOFactory()));
+
+        cfg.setConsistentId(gridName);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        deleteWorkFiles();
+
+        slowCheckpointEnabled.set(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 6 * 60 * 1000;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testThrottle() throws Exception {
+        startGrids(2).active(true);
+
+        try {
+            Ignite ig = ignite(0);
+
+            final int keyCnt = 2_000_000;
+
+            final AtomicBoolean run = new AtomicBoolean(true);
+
+            final AtomicBoolean zeroDropdown = new AtomicBoolean(false);
+
+            final HitRateMetrics putRate10secs = new HitRateMetrics(10_000, 
20);
+
+            final HitRateMetrics putRate1sec = new HitRateMetrics(1_000, 20);
+
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        Thread.sleep(5000);
+
+                        while (run.get()) {
+                            System.out.println(
+                                "Put rate over last 10 seconds: " + 
(putRate10secs.getRate() / 10) +
+                                    " puts/sec, over last 1 second: " + 
putRate1sec.getRate());
+
+                            if (putRate10secs.getRate() == 0) {
+                                zeroDropdown.set(true);
+
+                                run.set(false);
+                            }
+
+                            Thread.sleep(1000);
+                        }
+                    }
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    finally {
+                        run.set(false);
+                    }
+                }
+            }, "rate-checker");
+
+            final IgniteCache<Integer, TestValue> cache = 
ig.getOrCreateCache(CACHE_NAME);
+
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    long startTs = System.currentTimeMillis();
+
+                    for (int i = 0; i < keyCnt * 10 && 
System.currentTimeMillis() - startTs < 3 * 60 * 1000; i++) {
+                        if (!run.get())
+                            break;
+
+                        cache.put(ThreadLocalRandom.current().nextInt(keyCnt), 
new TestValue(ThreadLocalRandom.current().nextInt(),
+                            ThreadLocalRandom.current().nextInt()));
+
+                        putRate10secs.onHit();
+
+                        putRate1sec.onHit();
+                    }
+
+                    run.set(false);
+                }
+            }, "loader");
+
+            while (run.get())
+                LockSupport.parkNanos(10_000);
+
+            if (zeroDropdown.get()) {
+                slowCheckpointEnabled.set(false);
+
+                IgniteInternalFuture cpFut1 = 
((IgniteEx)ignite(0)).context().cache().context().database()
+                    .wakeupForCheckpoint("test");
+
+                IgniteInternalFuture cpFut2 = 
((IgniteEx)ignite(1)).context().cache().context().database()
+                    .wakeupForCheckpoint("test");
+
+                cpFut1.get();
+
+                cpFut2.get();
+
+                fail("Put rate degraded to zero for at least 10 seconds");
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestValue implements Serializable {
+        /** */
+        private final int v1;
+
+        /** */
+        private final int v2;
+
+        /** */
+        private byte[] payload = new byte[400 + 
ThreadLocalRandom.current().nextInt(20)];
+
+        /**
+         * @param v1 Value 1.
+         * @param v2 Value 2.
+         */
+        private TestValue(int v1, int v2) {
+            this.v1 = v1;
+            this.v2 = v2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue val = (TestValue)o;
+
+            return v1 == val.v1 && v2 == val.v2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = v1;
+
+            res = 31 * res + v2;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
"db", false));
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
"snapshot", false));
+    }
+
+    /**
+     * Create File I/O that emulates poor checkpoint write speed.
+     */
+    private class SlowCheckpointFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private final FileIOFactory delegateFactory = new 
RandomAccessFileIOFactory();
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, "rw");
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, String mode) throws 
IOException {
+            final FileIO delegate = delegateFactory.create(file, mode);
+
+            return new FileIODecorator(delegate) {
+                @Override public int write(ByteBuffer srcBuf) throws 
IOException {
+                    if (slowCheckpointEnabled.get() && 
Thread.currentThread().getName().contains("checkpoint"))
+                        LockSupport.parkNanos(5_000_000);
+
+                    return delegate.write(srcBuf);
+                }
+
+                @Override public int write(ByteBuffer srcBuf, long position) 
throws IOException {
+                    if (slowCheckpointEnabled.get() && 
Thread.currentThread().getName().contains("checkpoint"))
+                        LockSupport.parkNanos(5_000_000);
+
+                    return delegate.write(srcBuf, position);
+                }
+
+                @Override public void write(byte[] buf, int off, int len) 
throws IOException {
+                    if (slowCheckpointEnabled.get() && 
Thread.currentThread().getName().contains("checkpoint"))
+                        LockSupport.parkNanos(5_000_000);
+
+                    delegate.write(buf, off, len);
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index b2a1f65..ef7682f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTree
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.MetadataStoragePageMemoryImplTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest;
@@ -80,6 +81,9 @@ public class IgnitePdsTestSuite extends TestSuite {
 
         suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class);
 
+        // Write throttling
+        suite.addTestSuite(PagesWriteThrottleSmokeTest.class);
+
         return suite;
     }
 }

Reply via email to