IGNITE-7533 Throttle writing threads according to fsync progress - Fixes #3437.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b50aa5eb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b50aa5eb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b50aa5eb

Branch: refs/heads/ignite-7485-2
Commit: b50aa5eb0b3d8a641b574401b8d79e3ca612756a
Parents: e8bd98d
Author: dpavlov <dpav...@gridgain.com>
Authored: Wed Feb 7 10:37:51 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Wed Feb 7 10:37:51 2018 +0300

----------------------------------------------------------------------
 .../GridCacheDatabaseSharedManager.java         |  86 ++-
 .../pagemem/IntervalBasedMeasurement.java       | 304 +++++++++++
 .../persistence/pagemem/PageMemoryImpl.java     | 108 +++-
 .../pagemem/PagesWriteSpeedBasedThrottle.java   | 519 +++++++++++++++++++
 .../persistence/pagemem/PagesWriteThrottle.java |  21 +-
 .../pagemem/PagesWriteThrottlePolicy.java       |  39 ++
 ...gniteCheckpointDirtyPagesForLowLoadTest.java |  21 +-
 .../checkpoint/IgniteMassLoadSandboxTest.java   | 515 ++++++++++++++++++
 .../db/checkpoint/ProgressWatchdog.java         | 495 ++++++++++++++++++
 .../pagemem/BPlusTreePageMemoryImplTest.java    |   2 +-
 .../BPlusTreeReuseListPageMemoryImplTest.java   |   2 +-
 .../pagemem/IgniteThrottlingUnitTest.java       | 270 ++++++++++
 .../pagemem/IndexStoragePageMemoryImplTest.java |   2 +-
 .../pagemem/PageMemoryImplNoLoadTest.java       |   2 +-
 .../persistence/pagemem/PageMemoryImplTest.java |   2 +-
 .../ignite/testsuites/IgnitePdsTestSuite.java   |   1 -
 .../testsuites/IgnitePdsUnitTestSuite.java      |  31 ++
 .../testsuites/IgniteReproducingSuite.java      |   6 +-
 18 files changed, 2362 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 5dc81c5..bd80ec8 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -55,6 +55,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -129,7 +130,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -156,6 +156,7 @@ import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedHashMap;
 
 import static java.nio.file.StandardOpenOption.READ;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
@@ -355,7 +356,13 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Counter for written checkpoint pages. Not null only if checkpoint is 
running. */
     private volatile AtomicInteger writtenPagesCntr = null;
 
-    /** Number of pages in current checkpoint. */
+    /** Counter for fsynced checkpoint pages. Not null only if checkpoint is 
running. */
+    private volatile AtomicInteger syncedPagesCntr = null;
+
+    /** Counter for evictted checkpoint pages. Not null only if checkpoint is 
running. */
+    private volatile AtomicInteger evictedPagesCntr = null;
+
+    /** Number of pages in current checkpoint at the beginning of checkpoint. 
*/
     private volatile int currCheckpointPagesCnt;
 
     /** */
@@ -933,10 +940,18 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             chpBufSize = cacheSize;
         }
 
-        boolean writeThrottlingEnabled = 
persistenceCfg.isWriteThrottlingEnabled();
+        PageMemoryImpl.ThrottlingPolicy plc = 
persistenceCfg.isWriteThrottlingEnabled()
+            ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
+            : PageMemoryImpl.ThrottlingPolicy.NONE;
+
+        String val = 
IgniteSystemProperties.getString(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
 
-        if 
(IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED,
 false))
-            writeThrottlingEnabled = true;
+        if (val != null) {
+            if ("ratio".equalsIgnoreCase(val))
+                plc = PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED;
+            else if ("speed".equalsIgnoreCase(val) || Boolean.valueOf(val))
+                plc = PageMemoryImpl.ThrottlingPolicy.SPEED_BASED;
+        }
 
         GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
 
@@ -974,12 +989,17 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                     // Only after write we can write page into snapshot.
                     snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
+
+                    AtomicInteger cntr = evictedPagesCntr;
+
+                    if (cntr != null)
+                        cntr.incrementAndGet();
                 }
             },
             changeTracker,
             this,
             memMetrics,
-            writeThrottlingEnabled
+            plc
         );
 
         memMetrics.pageMemory(pageMem);
@@ -2610,6 +2630,20 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * @return Counter for fsynced checkpoint pages. Not null only if 
checkpoint is running.
+     */
+    public AtomicInteger syncedPagesCounter() {
+        return syncedPagesCntr;
+    }
+
+    /**
+     * @return Counter for evicted pages during current checkpoint. Not null 
only if checkpoint is running.
+     */
+    public AtomicInteger evictedPagesCntr() {
+        return evictedPagesCntr;
+    }
+
+    /**
      * @return Number of pages in current checkpoint. If checkpoint is not 
running, returns 0.
      */
     public int currentCheckpointPagesCount() {
@@ -2799,13 +2833,15 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 currCheckpointPagesCnt = chp.pagesSize;
 
                 writtenPagesCntr = new AtomicInteger();
+                syncedPagesCntr = new AtomicInteger();
+                evictedPagesCntr = new AtomicInteger();
 
                 boolean interrupted = true;
 
                 try {
                     if (chp.hasDelta()) {
                         // Identity stores set.
-                        GridConcurrentHashSet<PageStore> updStores = new 
GridConcurrentHashSet<>();
+                        ConcurrentLinkedHashMap<PageStore, LongAdder> 
updStores = new ConcurrentLinkedHashMap<>();
 
                         CountDownFuture doneWriteFut = new CountDownFuture(
                             asyncRunner == null ? 1 : 
chp.cpPages.collectionsSize());
@@ -2867,14 +2903,16 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         tracker.onFsyncStart();
 
                         if (!skipSync) {
-                            for (PageStore updStore : updStores) {
+                            for (Map.Entry<PageStore, LongAdder> updStoreEntry 
: updStores.entrySet()) {
                                 if (shutdownNow) {
                                     chp.progress.cpFinishFut.onDone(new 
NodeStoppingException("Node is stopping."));
 
                                     return;
                                 }
 
-                                updStore.sync();
+                                updStoreEntry.getKey().sync();
+
+                                
syncedPagesCntr.addAndGet(updStoreEntry.getValue().intValue());
                             }
                         }
                     }
@@ -3035,14 +3073,12 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     if (grp.isLocal() || !grp.walEnabled())
                         continue;
 
-                    List<GridDhtLocalPartition> locParts = new ArrayList<>();
+                    int locPartsSize = 0;
 
-                    for (GridDhtLocalPartition part : 
grp.topology().currentLocalPartitions())
-                        locParts.add(part);
+                    for (GridDhtLocalPartition ignored : 
grp.topology().currentLocalPartitions())
+                        locPartsSize++;
 
-                    Collections.sort(locParts, ASC_PART_COMPARATOR);
-
-                    CacheState state = new CacheState(locParts.size());
+                    CacheState state = new CacheState(locPartsSize);
 
                     for (GridDhtLocalPartition part : 
grp.topology().currentLocalPartitions()) {
                         state.addPartitionState(
@@ -3184,6 +3220,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 res.add(nextCpPagesCol);
             }
 
+            currCheckpointPagesCnt = pagesNum;
+
             return new IgniteBiTuple<>(res, pagesNum);
         }
 
@@ -3192,6 +3230,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
          */
         private void markCheckpointEnd(Checkpoint chp) throws 
IgniteCheckedException {
             synchronized (this) {
+                writtenPagesCntr = null;
+                syncedPagesCntr = null;
+                evictedPagesCntr = null;
+
                 for (DataRegion memPlc : dataRegions()) {
                     if (!memPlc.config().isPersistenceEnabled())
                         continue;
@@ -3208,8 +3250,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         null,
                         CheckpointEntryType.END);
 
-                writtenPagesCntr = null;
-
                 currCheckpointPagesCnt = 0;
             }
 
@@ -3261,7 +3301,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         }
 
         if (persistenceCfg.getCheckpointWriteOrder() == 
CheckpointWriteOrder.SEQUENTIAL) {
-            Collections.sort(cpPagesList, new Comparator<FullPageId>() {
+            FullPageId[] objects = cpPagesList.toArray(new 
FullPageId[cpPagesList.size()]);
+
+            Arrays.parallelSort(objects, new Comparator<FullPageId>() {
                 @Override public int compare(FullPageId o1, FullPageId o2) {
                     int cmp = Long.compare(o1.groupId(), o2.groupId());
                     if (cmp != 0)
@@ -3271,6 +3313,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         PageIdUtils.effectivePageId(o2.pageId()));
                 }
             });
+
+            cpPagesList = Arrays.asList(objects);
         }
 
         int cpThreads = persistenceCfg.getCheckpointThreads();
@@ -3302,7 +3346,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         private Collection<FullPageId> writePageIds;
 
         /** */
-        private GridConcurrentHashSet<PageStore> updStores;
+        private ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
 
         /** */
         private CountDownFuture doneFut;
@@ -3322,7 +3366,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         private WriteCheckpointPages(
             final CheckpointMetricsTracker tracker,
             final Collection<FullPageId> writePageIds,
-            final GridConcurrentHashSet<PageStore> updStores,
+            final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores,
             final CountDownFuture doneFut,
             final int totalPagesToWrite) {
             this.tracker = tracker;
@@ -3398,7 +3442,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                         PageStore store = storeMgr.writeInternal(grpId, 
fullId.pageId(), tmpWriteBuf, tag, false);
 
-                        updStores.add(store);
+                        updStores.computeIfAbsent(store, k -> new 
LongAdder()).increment();
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java
new file mode 100644
index 0000000..db4da6b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Speed tracker for determine speed of processing based on increments or 
exact counters value. <br>
+ * Measurement is performed using several intervals (1 current + 3 historical 
by default). <br>
+ * Too old measurements (intervals) may be dropped if automatic switch mode 
activated.<br>
+ * To determine speed current measurement is reduced with all historical.<br>
+ *     <br>
+ *  For mode of manual measurements switch it is possible to use
+ *  <br> default ctor
+ *  {@link #IntervalBasedMeasurement()} and methods <br>
+ *     {@link #setCounter(long, long)} (automatically opens interval if not 
opened) and <br>
+ *     {@link #finishInterval()} to close measurement.<br>
+ *     <br>
+ *  For mode of automatic measurements switch it is possible to use
+ *  <br> parametrized ctor
+ *  {@link #IntervalBasedMeasurement(int, int)} and methods <br>
+ *     {@link #setCounter(long, long)} (automatically opens interval if not 
opened) or
+ *     {@link #addMeasurementForAverageCalculation(long)} to provide metrics 
value in addition to event.<br>
+ *     {@link #finishInterval()} is also supported, but not required<br>
+ *     <br>
+ *
+ *  To get results of speed calculation it is possible to use <br>
+ *  Method {@link #getSpeedOpsPerSec(long)} to get current speed (and 
swicth/open interval if needed). <br>
+ *   or method {@link #getSpeedOpsPerSecReadOnly()} to get current speed 
without interval modification.<br>
+ *
+ *  If metric value was provided using {@link 
#addMeasurementForAverageCalculation(long)}
+ *  then method {@link #getAverage()} can be used to get resulting metrics 
average value during period of time.
+ */
+class IntervalBasedMeasurement {
+    /** Nanos in second. */
+    private static final long NANOS_IN_SECOND = TimeUnit.SECONDS.toNanos(1);
+
+    /** Current Measurement interval atomic reference. */
+    private AtomicReference<MeasurementInterval> measurementIntervalAtomicRef 
= new AtomicReference<>();
+
+    /** Interval automatic switch nanoseconds. Negative value means no 
automatic switch. */
+    private final long intervalSwitchNanos;
+
+    /** Max historical measurements to keep. */
+    private final int maxMeasurements;
+
+    /**
+     * Previous (historical) measurements. One thread can write (winner in CAS 
of {@link
+     * #measurementIntervalAtomicRef}), all other threads may read.
+     */
+    private final ConcurrentLinkedQueue<MeasurementInterval> prevMeasurements 
= new ConcurrentLinkedQueue<>();
+
+    /**
+     * Default constructor. No automatic switch, 3 historical measurements.
+     */
+    IntervalBasedMeasurement() {
+        this(-1, 3);
+    }
+
+    /**
+     * @param intervalSwitchMs Interval switch milliseconds.
+     * @param maxMeasurements Max historical measurements to keep.
+     */
+    IntervalBasedMeasurement(int intervalSwitchMs, int maxMeasurements) {
+        this.intervalSwitchNanos = intervalSwitchMs > 0 ? intervalSwitchMs * 
TimeUnit.MILLISECONDS.toNanos(1) : -1;
+        this.maxMeasurements = maxMeasurements;
+    }
+
+    /**
+     * Gets speed, start interval (if not started).
+     *
+     * @param curNanoTime current time nanos.
+     * @return speed in pages per second based on current data.
+     */
+    long getSpeedOpsPerSec(long curNanoTime) {
+        return calcSpeed(interval(curNanoTime), curNanoTime);
+    }
+
+    /**
+     * Gets current speed, does not start measurement.
+     *
+     * @return speed in pages per second based on current data.
+     */
+    long getSpeedOpsPerSecReadOnly() {
+        MeasurementInterval interval = measurementIntervalAtomicRef.get();
+
+        long curNanoTime = System.nanoTime();
+
+        return calcSpeed(interval, curNanoTime);
+    }
+
+    /**
+     * Reduce measurements to calculate average speed.
+     *
+     * @param interval current measurement.
+     * @param curNanoTime current time in nanoseconds.
+     * @return speed in operations per second from historical only 
measurements.
+     */
+    private long calcSpeed(@Nullable MeasurementInterval interval, long 
curNanoTime) {
+        long nanosPassed = 0;
+        long opsDone = 0;
+
+        if (!isOutdated(interval, curNanoTime)) {
+            nanosPassed += curNanoTime - interval.startNanoTime;
+            opsDone += interval.cntr.get();
+        }
+
+        for (MeasurementInterval prevMeasurement : prevMeasurements) {
+            if (!isOutdated(prevMeasurement, curNanoTime)) {
+                nanosPassed += prevMeasurement.endNanoTime - 
prevMeasurement.startNanoTime;
+                opsDone += prevMeasurement.cntr.get();
+            }
+        }
+
+        return nanosPassed <= 0 ? 0 : opsDone * NANOS_IN_SECOND / nanosPassed;
+    }
+
+
+
+    /**
+     * @param interval Measurement to check. {@code null} is always outdated.
+     * @param curNanoTime Current time in nanoseconds.
+     * @return {@code True} if measurement is outdated.
+     */
+    private boolean isOutdated(@Nullable final MeasurementInterval interval, 
long curNanoTime) {
+        if (interval == null)
+            return true;
+
+        long elapsedNs = curNanoTime - interval.startNanoTime;
+
+        if (elapsedNs <= 0)
+            return true; // interval is started only now
+
+        return (intervalSwitchNanos > 0)
+            && elapsedNs > (maxMeasurements + 1) * intervalSwitchNanos;
+    }
+
+    /**
+     * Gets or creates measurement interval, performs switch to new 
measurement by timeout.
+     * @param curNanoTime current nano time.
+     * @return interval to use.
+     */
+    @NotNull private MeasurementInterval interval(long curNanoTime) {
+        MeasurementInterval interval;
+
+        do {
+            interval = measurementIntervalAtomicRef.get();
+            if (interval == null) {
+                MeasurementInterval newInterval = new 
MeasurementInterval(curNanoTime);
+
+                if (measurementIntervalAtomicRef.compareAndSet(null, 
newInterval))
+                    interval = newInterval;
+                else
+                    continue;
+            }
+
+            if (intervalSwitchNanos > 0 && (curNanoTime - 
interval.startNanoTime) > intervalSwitchNanos) {
+                MeasurementInterval newInterval = new 
MeasurementInterval(curNanoTime);
+
+                if (measurementIntervalAtomicRef.compareAndSet(interval, 
newInterval)) {
+                    interval.endNanoTime = curNanoTime;
+
+                    pushToHistory(interval);
+                }
+            }
+        }
+        while (interval == null);
+
+        return interval;
+    }
+
+    /**
+     * @param interval finished interval to push to history.
+     */
+    private void pushToHistory(MeasurementInterval interval) {
+        prevMeasurements.offer(interval);
+
+        if (prevMeasurements.size() > maxMeasurements)
+            prevMeasurements.remove();
+    }
+
+    /**
+     * Set exact value for counter in current measurement interval, useful 
only for manually managed measurements.
+     *
+     * @param val new value to set.
+     * @param curNanoTime current nano time.
+     */
+    void setCounter(long val, long curNanoTime) {
+        interval(curNanoTime).cntr.set(val);
+    }
+
+    /**
+     * Manually switch interval to empty (not started measurement).
+     */
+    void finishInterval() {
+        while (true) {
+            MeasurementInterval interval = measurementIntervalAtomicRef.get();
+
+            if (interval == null)
+                return;
+
+            if (measurementIntervalAtomicRef.compareAndSet(interval, null)) {
+                interval.endNanoTime = System.nanoTime();
+
+                pushToHistory(interval);
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Gets average metric value previously reported by {@link 
#addMeasurementForAverageCalculation(long)}.
+     * This method may start new interval measurement or switch current.
+     *
+     * @return average metric value.
+     */
+    public long getAverage() {
+        long time = System.nanoTime();
+
+        return avgMeasurementWithHistorical(interval(time), time);
+    }
+
+    /**
+     * Reduce measurements to calculate average value.
+     *
+     * @param interval current measurement. If null only historical is used.
+     * @param curNanoTime current time nanoseconds
+     * @return speed in page per second.
+     */
+    private long avgMeasurementWithHistorical(@Nullable MeasurementInterval 
interval, long curNanoTime) {
+        long cnt = 0;
+        long sum = 0;
+        if (!isOutdated(interval, curNanoTime)) {
+            cnt += interval.cntr.get();
+            sum += interval.sum.get();
+        }
+        for (MeasurementInterval prevMeasurement : prevMeasurements) {
+            if (!isOutdated(prevMeasurement, curNanoTime)) {
+                cnt += prevMeasurement.cntr.get();
+                sum += prevMeasurement.sum.get();
+            }
+        }
+
+        return cnt <= 0 ? 0 : sum / cnt;
+    }
+
+    /**
+     * Adds measurement to be used for average calculation. Calling this 
method will later calculate speed of
+     * measurements come. Result can be taken from {@link #getAverage()}.
+     *
+     * @param val value measured now, to be used for average calculation.
+     */
+    void addMeasurementForAverageCalculation(long val) {
+        MeasurementInterval interval = interval(System.nanoTime());
+
+        interval.cntr.incrementAndGet();
+        interval.sum.addAndGet(val);
+    }
+
+    /**
+     * Measurement interval, completed or open.
+     */
+    private static class MeasurementInterval {
+        /** Counter of performed operations, pages. */
+        private AtomicLong cntr = new AtomicLong();
+
+        /** Sum of measured value, used only for average calculation. */
+        private AtomicLong sum = new AtomicLong();
+
+        /** Timestamp in nanoseconds of measurement start. */
+        private final long startNanoTime;
+
+        /** Timestamp in nanoseconds of measurement end. 0 for open (running) 
measurements.*/
+        private volatile long endNanoTime;
+
+        /**
+         * @param startNanoTime Timestamp of measurement start.
+         */
+        MeasurementInterval(long startNanoTime) {
+            this.startNanoTime = startNanoTime;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 496a7b1..e4c369d 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -226,10 +226,10 @@ public class PageMemoryImpl implements PageMemoryEx {
     @Nullable private final GridInClosure3X<Long, FullPageId, PageMemoryEx> 
changeTracker;
 
     /** Pages write throttle. */
-    private PagesWriteThrottle writeThrottle;
+    private PagesWriteThrottlePolicy writeThrottle;
 
-    /** Write throttle enabled flag. */
-    private boolean throttleEnabled;
+    /** Write throttle type. */
+    private ThrottlingPolicy throttlingPlc;
 
     /**  */
     private boolean pageEvictWarned;
@@ -246,7 +246,7 @@ public class PageMemoryImpl implements PageMemoryEx {
      * @param pageSize Page size.
      * @param flushDirtyPage Callback invoked when a dirty page is evicted.
      * @param changeTracker Callback invoked to track changes in pages.
-     * @param throttleEnabled Write throttle enabled flag.
+     * @param throttlingPlc Write throttle enabled and type.
      */
     public PageMemoryImpl(
         DirectMemoryProvider directMemoryProvider,
@@ -257,7 +257,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         @Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> 
changeTracker,
         CheckpointLockStateChecker stateChecker,
         DataRegionMetricsImpl memMetrics,
-        boolean throttleEnabled
+        ThrottlingPolicy throttlingPlc
     ) {
         assert ctx != null;
 
@@ -269,7 +269,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         this.flushDirtyPage = flushDirtyPage;
         this.changeTracker = changeTracker;
         this.stateChecker = stateChecker;
-        this.throttleEnabled = throttleEnabled;
+        this.throttlingPlc = throttlingPlc;
 
         storeMgr = ctx.pageStore();
         walMgr = ctx.wal();
@@ -320,7 +320,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             totalAllocated += reg.size();
 
-            segments[i] = new Segment(i, regions.get(i), 
checkpointPool.pages() / segments.length, throttleEnabled);
+            segments[i] = new Segment(i, regions.get(i), 
checkpointPool.pages() / segments.length, throttlingPlc);
 
             pages += segments[i].pages();
             totalTblSize += segments[i].tableSize();
@@ -344,11 +344,17 @@ public class PageMemoryImpl implements PageMemoryEx {
             log.error("Write throttle can't start. Unexpected class of 
database manager: " +
                 ctx.database().getClass());
 
-            throttleEnabled = false;
+            throttlingPlc = ThrottlingPolicy.NONE;
         }
 
-        if (throttleEnabled)
-            writeThrottle = new PagesWriteThrottle(this, 
(GridCacheDatabaseSharedManager)ctx.database());
+        if (isThrottlingEnabled()) {
+            GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)ctx.database();
+
+            if (throttlingPlc == ThrottlingPolicy.SPEED_BASED)
+                writeThrottle = new PagesWriteSpeedBasedThrottle(this, db, 
log);
+            else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
+                writeThrottle = new PagesWriteThrottle(this, db);
+        }
     }
 
     /** {@inheritDoc} */
@@ -431,6 +437,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         assert ctx.database().checkpointLockIsHeldByThread();
 
+        if (isThrottlingEnabled())
+            writeThrottle.onMarkDirty(false);
+
         long pageId = storeMgr.allocatePage(cacheId, partId, flags);
 
         assert PageIdUtils.pageIndex(pageId) > 0; //it's crucial for tracking 
pages (zero page is super one)
@@ -876,6 +885,32 @@ public class PageMemoryImpl implements PageMemoryEx {
         return false;
     }
 
+    /**
+     * @return Max dirty ratio from the segments.
+     */
+    double getDirtyPagesRatio() {
+        double res = 0;
+
+        for (Segment segment : segments) {
+            res = Math.max(res, segment.getDirtyPagesRatio());
+        }
+
+        return res;
+    }
+
+    /**
+     * @return Total pages can be placed in all segments.
+     */
+    public long totalPages() {
+        long res = 0;
+
+        for (Segment segment : segments) {
+            res += segment.pages();
+        }
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() 
throws IgniteException {
         if (segments == null)
@@ -896,9 +931,19 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         memMetrics.resetDirtyPages();
 
+        if (isThrottlingEnabled())
+            writeThrottle.onBeginCheckpoint();
+
         return new GridMultiCollectionWrapper<>(collections);
     }
 
+    /**
+     * @return {@code True} if throttling is enabled.
+     */
+    private boolean isThrottlingEnabled() {
+        return throttlingPlc != ThrottlingPolicy.NONE;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "TooBroadScope"})
     @Override public void finishCheckpoint() {
@@ -908,7 +953,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         for (Segment seg : segments)
             seg.segCheckpointPages = null;
 
-        if (throttleEnabled)
+        if (isThrottlingEnabled())
             writeThrottle.onFinishCheckpoint();
     }
 
@@ -1327,7 +1372,11 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /**
      * @param page Page pointer.
+     * @param fullId full page ID.
+     * @param walPlc
      * @param walPlc Full page WAL record policy.
+     * @param markDirty set dirty flag to page.
+     * @param restore
      */
     private void writeUnlockPage(
         long page,
@@ -1336,13 +1385,13 @@ public class PageMemoryImpl implements PageMemoryEx {
         boolean markDirty,
         boolean restore
     ) {
-        boolean dirty = isDirty(page);
+        boolean wasDirty = isDirty(page);
 
         //if page is for restore, we shouldn't mark it as changed
-        if (!restore && markDirty && !dirty && changeTracker != null)
+        if (!restore && markDirty && !wasDirty && changeTracker != null)
             changeTracker.apply(page, fullId, this);
 
-        boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE 
|| !dirty);
+        boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE 
|| !wasDirty);
 
         assert GridUnsafe.getInt(page + PAGE_OVERHEAD + 4) == 0; //TODO 
GG-11480
 
@@ -1360,7 +1409,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         try {
             rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, 
PageIdUtils.tag(pageId));
 
-            if (throttleEnabled && !restore && markDirty)
+            if (isThrottlingEnabled() && !restore && markDirty && !wasDirty)
                 writeThrottle.onMarkDirty(isInCheckpoint(fullId));
         }
         catch (AssertionError ex) {
@@ -1449,12 +1498,13 @@ public class PageMemoryImpl implements PageMemoryEx {
     /**
      * This method must be called in synchronized context.
      *
+     * @param pageId full page ID.
      * @param absPtr Absolute pointer.
      * @param dirty {@code True} dirty flag.
      * @param forceAdd If this flag is {@code true}, then the page will be 
added to the dirty set regardless whether the
      * old flag was dirty or not.
      */
-    void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean 
forceAdd) {
+    private void setDirty(FullPageId pageId, long absPtr, boolean dirty, 
boolean forceAdd) {
         boolean wasDirty = PageHeader.dirty(absPtr, dirty);
 
         if (dirty) {
@@ -1733,9 +1783,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         /**
          * @param region Memory region.
-         * @param throttlingEnabled Write throttling enabled flag.
+         * @param throttlingPlc policy determine if write throttling enabled 
and its type.
          */
-        private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, 
boolean throttlingEnabled) {
+        private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, 
ThrottlingPolicy throttlingPlc) {
             long totalMemory = region.size();
 
             int pages = (int)(totalMemory / sysPageSize);
@@ -1752,7 +1802,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             pool = new PagePool(idx, poolRegion, null);
 
-            maxDirtyPages = throttlingEnabled ? pool.pages() * 3 / 4 : 
Math.min(pool.pages() * 2 / 3, cpPoolPages);
+            maxDirtyPages = throttlingPlc != ThrottlingPolicy.NONE
+                ? pool.pages() * 3 / 4
+                : Math.min(pool.pages() * 2 / 3, cpPoolPages);
         }
 
         /**
@@ -1780,7 +1832,14 @@ public class PageMemoryImpl implements PageMemoryEx {
          * @param dirtyRatioThreshold Throttle threshold.
          */
         private boolean shouldThrottle(double dirtyRatioThreshold) {
-            return ((double)dirtyPages.size()) / pages() > dirtyRatioThreshold;
+            return getDirtyPagesRatio() > dirtyRatioThreshold;
+        }
+
+        /**
+         * @return dirtyRatio to be compared with Throttle threshold.
+         */
+        private double getDirtyPagesRatio() {
+            return ((double)dirtyPages.size()) / pages();
         }
 
         /**
@@ -2605,4 +2664,13 @@ public class PageMemoryImpl implements PageMemoryEx {
             }
         }
     }
+
+    /**
+     * Throttling enabled and its type enum.
+     */
+    public enum ThrottlingPolicy {
+        /** Not throttled. */NONE,
+        /** Target ratio based: CP progress is used as border. */ 
TARGET_RATIO_BASED,
+        /** Speed based. CP writting speed and estimated ideal speed are used 
as border */ SPEED_BASED
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
new file mode 100644
index 0000000..cb19eca
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -0,0 +1,519 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteLogger;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
+ * Uses average checkpoint write speed and moment speed of marking pages as 
dirty.
+ */
+public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
+    /** Maximum dirty pages in region. */
+    private static final double MAX_DIRTY_PAGES = 0.75;
+
+    /** Page memory. */
+    private final PageMemoryImpl pageMemory;
+
+    /** Database manager. */
+    private final GridCacheDatabaseSharedManager dbSharedMgr;
+
+    /** Starting throttle time. Limits write speed to 1000 MB/s. */
+    private static final long STARTING_THROTTLE_NANOS = 4000;
+
+    /** Backoff ratio. Each next park will be this times longer. */
+    private static final double BACKOFF_RATIO = 1.05;
+
+    /** Percent of dirty pages which will not cause throttling. */
+    private static final double MIN_RATIO_NO_THROTTLE = 0.03;
+
+    /** Exponential backoff counter. */
+    private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0);
+
+    /** Counter of written pages from checkpoint. Value is saved here for 
detecting checkpoint start. */
+    private final AtomicInteger lastObservedWritten = new AtomicInteger(0);
+
+    /**
+     * Dirty pages ratio was observed at checkpoint start (here start is 
moment when first page was actually saved to
+     * store). This ratio is excluded from throttling.
+     */
+    private volatile double initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE;
+
+    /**
+     * Target (maximum) dirty pages ratio, after which throttling will start 
using
+     * {@link #getParkTime(double, long, int, int, long, long)}.
+     */
+    private volatile double targetDirtyRatio;
+
+    /**
+     * Current dirty pages ratio (percent of dirty pages in most used 
segment), negative value means no cp is running.
+     */
+    private volatile double currDirtyRatio;
+
+    /** Speed average checkpoint write speed. Current and 3 past checkpoints 
used. Pages/second. */
+    private final IntervalBasedMeasurement speedCpWrite = new 
IntervalBasedMeasurement();
+
+    /** Last estimated speed for marking all clear pages as dirty till the end 
of checkpoint. */
+    private volatile long speedForMarkAll;
+
+    /** Threads set. Contains identifiers of all threads which were marking 
pages for current checkpoint. */
+    private final GridConcurrentHashSet<Long> threadIds = new 
GridConcurrentHashSet<>();
+
+    /**
+     * Used for calculating speed of marking pages dirty.
+     * Value from past 750-1000 millis only.
+     * {@link IntervalBasedMeasurement#getSpeedOpsPerSec(long)} returns pages 
marked/second.
+     * {@link IntervalBasedMeasurement#getAverage()} returns average throttle 
time.
+     * */
+    private final IntervalBasedMeasurement speedMarkAndAvgParkTime = new 
IntervalBasedMeasurement(250, 3);
+
+    /** Total pages which is possible to store in page memory. */
+    private long totalPages;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Previous warning time, nanos. */
+    private AtomicLong prevWarnTime = new AtomicLong();
+
+    /** Warning min delay nanoseconds. */
+    private static final long WARN_MIN_DELAY_NS = TimeUnit.SECONDS.toNanos(10);
+
+    /** Warning threshold: minimal level of pressure that causes warning 
messages to log. */
+    static final double WARN_THRESHOLD = 0.2;
+
+    /**
+     * @param pageMemory Page memory.
+     * @param dbSharedMgr Database manager.
+     * @param log Logger.
+     */
+    public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory,
+        GridCacheDatabaseSharedManager dbSharedMgr, IgniteLogger log) {
+        this.pageMemory = pageMemory;
+        this.dbSharedMgr = dbSharedMgr;
+        totalPages = pageMemory.totalPages();
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert dbSharedMgr.checkpointLockIsHeldByThread();
+
+        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+
+        if (writtenPagesCntr == null) {
+            speedForMarkAll = 0;
+            targetDirtyRatio = -1;
+            currDirtyRatio = -1;
+
+            return; // Don't throttle if checkpoint is not running.
+        }
+
+        int cpWrittenPages = writtenPagesCntr.get();
+
+        long fullyCompletedPages = (cpWrittenPages + cpSyncedPages()) / 2; // 
written & sync'ed
+
+        long curNanoTime = System.nanoTime();
+
+        speedCpWrite.setCounter(fullyCompletedPages, curNanoTime);
+
+        long markDirtySpeed = 
speedMarkAndAvgParkTime.getSpeedOpsPerSec(curNanoTime);
+
+        long curCpWriteSpeed = speedCpWrite.getSpeedOpsPerSec(curNanoTime);
+
+        threadIds.add(Thread.currentThread().getId());
+
+        ThrottleMode level = ThrottleMode.NO; //should apply delay 
(throttling) for current page modification
+
+        if (isPageInCheckpoint) {
+            int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 
2 / 3;
+
+            if (pageMemory.checkpointBufferPagesCount() > checkpointBufLimit)
+                level = ThrottleMode.EXPONENTIAL;
+        }
+
+        long throttleParkTimeNs = 0;
+
+        if (level == ThrottleMode.NO) {
+            int nThreads = threadIds.size();
+
+            int cpTotalPages = cpTotalPages();
+
+            if (cpTotalPages == 0) {
+                boolean throttleByCpSpeed = curCpWriteSpeed > 0 && 
markDirtySpeed > curCpWriteSpeed;
+
+                if (throttleByCpSpeed) {
+                    throttleParkTimeNs = calcDelayTime(curCpWriteSpeed, 
nThreads, 1);
+
+                    level = ThrottleMode.LIMITED;
+                }
+            }
+            else {
+                double dirtyPagesRatio = pageMemory.getDirtyPagesRatio();
+
+                currDirtyRatio = dirtyPagesRatio;
+
+                detectCpPagesWriteStart(cpWrittenPages, dirtyPagesRatio);
+
+                if (dirtyPagesRatio >= MAX_DIRTY_PAGES)
+                    level = ThrottleMode.NO; // too late to throttle, will 
wait on safe to update instead.
+                else {
+                    int notEvictedPagesTotal = cpTotalPages - cpEvictedPages();
+
+                    throttleParkTimeNs = getParkTime(dirtyPagesRatio,
+                        fullyCompletedPages,
+                        notEvictedPagesTotal < 0 ? 0 : notEvictedPagesTotal,
+                        nThreads,
+                        markDirtySpeed,
+                        curCpWriteSpeed);
+
+                    level = ThrottleMode.LIMITED;
+                }
+            }
+        }
+
+        if (level == ThrottleMode.NO) {
+            exponentialBackoffCntr.set(0);
+
+            throttleParkTimeNs = 0;
+        }
+        else if (level == ThrottleMode.EXPONENTIAL) {
+            int exponent = exponentialBackoffCntr.getAndIncrement();
+
+            throttleParkTimeNs = (long)(STARTING_THROTTLE_NANOS * 
Math.pow(BACKOFF_RATIO, exponent));
+        }
+
+        if (throttleParkTimeNs > 0) {
+            recurrentLogIfNeed();
+
+            doPark(throttleParkTimeNs);
+        }
+
+        
speedMarkAndAvgParkTime.addMeasurementForAverageCalculation(throttleParkTimeNs);
+    }
+
+    /**
+     * Disables the current thread for thread scheduling purposes. May be 
overriden by subclasses for tests
+     *
+     * @param throttleParkTimeNs the maximum number of nanoseconds to wait
+     */
+    protected void doPark(long throttleParkTimeNs) {
+        LockSupport.parkNanos(throttleParkTimeNs);
+    }
+
+    /**
+     * @return number of written pages.
+     */
+    private int cpWrittenPages() {
+        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+
+        return writtenPagesCntr == null ? 0 : writtenPagesCntr.get();
+    }
+
+    /**
+     * @return Number of pages in current checkpoint.
+     */
+    private int cpTotalPages() {
+        return dbSharedMgr.currentCheckpointPagesCount();
+    }
+
+    /**
+     * @return  Counter for fsynced checkpoint pages.
+     */
+    private int cpSyncedPages() {
+        AtomicInteger syncedPagesCntr = dbSharedMgr.syncedPagesCounter();
+
+        return syncedPagesCntr == null ? 0 : syncedPagesCntr.get();
+    }
+
+    /**
+     * @return number of evicted pages.
+     */
+    private int cpEvictedPages() {
+        AtomicInteger evictedPagesCntr = dbSharedMgr.evictedPagesCntr();
+
+        return evictedPagesCntr == null ? 0 : evictedPagesCntr.get();
+    }
+
+    /**
+     * Prints warning to log if throttling is occurred and requires markable 
amount of time.
+     */
+    private void recurrentLogIfNeed() {
+        long prevWarningNs = prevWarnTime.get();
+        long curNs = System.nanoTime();
+
+        if (prevWarningNs != 0 && (curNs - prevWarningNs) <= WARN_MIN_DELAY_NS)
+            return;
+
+        double weight = throttleWeight();
+        if (weight <= WARN_THRESHOLD)
+            return;
+
+        if (prevWarnTime.compareAndSet(prevWarningNs, curNs)) {
+            String msg = String.format("Throttling is applied to page 
modifications " +
+                    "[percentOfPartTime=%.2f, markDirty=%d pages/sec, 
checkpointWrite=%d pages/sec, " +
+                    "estIdealMarkDirty=%d pages/sec, curDirty=%.2f, 
maxDirty=%.2f, avgParkTime=%d ns, " +
+                    "pages: (total=%d, evicted=%d, written=%d, synced=%d, 
cpBufUsed=%d, cpBufTotal=%d)]",
+                weight, getMarkDirtySpeed(), getCpWriteSpeed(),
+                getLastEstimatedSpeedForMarkAll(), getCurrDirtyRatio(), 
getTargetDirtyRatio(), throttleParkTime(),
+                cpTotalPages(), cpEvictedPages(), cpWrittenPages(), 
cpSyncedPages(),
+                pageMemory.checkpointBufferPagesCount(), 
pageMemory.checkpointBufferPagesSize());
+
+            log.info(msg);
+        }
+    }
+
+    /**
+     * @param dirtyPagesRatio actual percent of dirty pages.
+     * @param fullyCompletedPages written & fsynced pages count.
+     * @param cpTotalPages total checkpoint scope.
+     * @param nThreads number of threads providing data during current 
checkpoint.
+     * @param markDirtySpeed registered mark dirty speed, pages/sec.
+     * @param curCpWriteSpeed average checkpoint write speed, pages/sec.
+     * @return time in nanoseconds to part or 0 if throttling is not required.
+     */
+    long getParkTime(
+        double dirtyPagesRatio,
+        long fullyCompletedPages,
+        int cpTotalPages,
+        int nThreads,
+        long markDirtySpeed,
+        long curCpWriteSpeed) {
+
+        long speedForMarkAll = 
calcSpeedToMarkAllSpaceTillEndOfCp(dirtyPagesRatio,
+            fullyCompletedPages,
+            curCpWriteSpeed,
+            cpTotalPages);
+
+        double targetDirtyRatio = calcTargetDirtyRatio(fullyCompletedPages, 
cpTotalPages);
+
+        this.speedForMarkAll = speedForMarkAll; //publish for metrics
+        this.targetDirtyRatio = targetDirtyRatio; //publish for metrics
+
+        boolean lowSpaceLeft = dirtyPagesRatio > targetDirtyRatio && 
(dirtyPagesRatio + 0.05 > MAX_DIRTY_PAGES);
+        int slowdown = lowSpaceLeft ? 3 : 1;
+
+        double multiplierForSpeedForMarkAll = lowSpaceLeft
+            ? 0.8
+            : 1.0;
+
+        boolean markingTooFast = speedForMarkAll > 0 && markDirtySpeed > 
multiplierForSpeedForMarkAll * speedForMarkAll;
+        boolean throttleBySizeAndMarkSpeed = dirtyPagesRatio > 
targetDirtyRatio && markingTooFast;
+
+        //for case of speedForMarkAll >> markDirtySpeed, allow write little 
bit faster than CP average
+        double allowWriteFasterThanCp = (speedForMarkAll > 0 && markDirtySpeed 
> 0 && speedForMarkAll > markDirtySpeed)
+            ? (0.1 * speedForMarkAll / markDirtySpeed)
+            : (dirtyPagesRatio > targetDirtyRatio ? 0.0 : 0.1);
+
+        double fasterThanCpWriteSpeed = lowSpaceLeft
+            ? 1.0
+            : 1.0 + allowWriteFasterThanCp;
+        boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > 
(fasterThanCpWriteSpeed * curCpWriteSpeed);
+
+        long delayByCpWrite = throttleByCpSpeed ? 
calcDelayTime(curCpWriteSpeed, nThreads, slowdown) : 0;
+        long delayByMarkAllWrite = throttleBySizeAndMarkSpeed ? 
calcDelayTime(speedForMarkAll, nThreads, slowdown) : 0;
+        return Math.max(delayByCpWrite, delayByMarkAllWrite);
+    }
+
+    /**
+     * @param dirtyPagesRatio current percent of dirty pages.
+     * @param fullyCompletedPages count of written and sync'ed pages
+     * @param curCpWriteSpeed pages/second checkpoint write speed. 0 speed 
means 'no data'.
+     * @param cpTotalPages total pages in checkpoint.
+     * @return pages/second to mark to mark all clean pages as dirty till the 
end of checkpoint. 0 speed means 'no
+     * data'.
+     */
+    private long calcSpeedToMarkAllSpaceTillEndOfCp(double dirtyPagesRatio,
+        long fullyCompletedPages,
+        long curCpWriteSpeed,
+        int cpTotalPages) {
+
+        if (curCpWriteSpeed == 0)
+            return 0;
+
+        if (cpTotalPages <= 0)
+            return 0;
+
+        if (dirtyPagesRatio >= MAX_DIRTY_PAGES)
+            return 0;
+
+        double remainedClear = (MAX_DIRTY_PAGES - dirtyPagesRatio) * 
totalPages;
+
+        double timeRemainedSeconds = 1.0 * (cpTotalPages - 
fullyCompletedPages) / curCpWriteSpeed;
+
+        return (long)(remainedClear / timeRemainedSeconds);
+    }
+
+    /**
+     * @param fullyCompletedPages number of completed.
+     * @param cpTotalPages Total amount of pages under checkpoint.
+     * @return size-based calculation of target ratio.
+     */
+    private double calcTargetDirtyRatio(long fullyCompletedPages, int 
cpTotalPages) {
+        double cpProgress = ((double)fullyCompletedPages) / cpTotalPages;
+
+        // Starting with initialDirtyRatioAtCpBegin to avoid throttle right 
after checkpoint start
+        double constStart = initDirtyRatioAtCpBegin;
+
+        double throttleTotalWeight = 1.0 - constStart;
+
+        // .75 is maximum ratio of dirty pages
+        return (cpProgress * throttleTotalWeight + constStart) * 
MAX_DIRTY_PAGES;
+    }
+
+    /**
+     * @param baseSpeed speed to slow down.
+     * @param nThreads operating threads.
+     * @param coefficient how much it is needed to slowdown base speed. 1.0 
means delay to get exact base speed.
+     * @return sleep time in nanoseconds.
+     */
+    private long calcDelayTime(long baseSpeed, int nThreads, double 
coefficient) {
+        if (coefficient <= 0.0)
+            return 0;
+
+        if (baseSpeed <= 0)
+            return 0;
+
+        long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / 
(baseSpeed);
+
+        return (long)(coefficient * updTimeNsForOnePage);
+    }
+
+    /**
+     * @param cpWrittenPages current counter of written pages.
+     * @param dirtyPagesRatio current percent of dirty pages.
+     */
+    private void detectCpPagesWriteStart(int cpWrittenPages, double 
dirtyPagesRatio) {
+        if (cpWrittenPages > 0 && lastObservedWritten.compareAndSet(0, 
cpWrittenPages)) {
+            double newMinRatio = dirtyPagesRatio;
+
+            if (newMinRatio < MIN_RATIO_NO_THROTTLE)
+                newMinRatio = MIN_RATIO_NO_THROTTLE;
+
+            if (newMinRatio > 1)
+                newMinRatio = 1;
+
+            //for slow cp is completed now, drop previous dirty page percent
+            initDirtyRatioAtCpBegin = newMinRatio;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBeginCheckpoint() {
+        speedCpWrite.setCounter(0L, System.nanoTime());
+
+        initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE;
+
+        lastObservedWritten.set(0);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void onFinishCheckpoint() {
+        exponentialBackoffCntr.set(0);
+
+        speedCpWrite.finishInterval();
+        speedMarkAndAvgParkTime.finishInterval();
+        threadIds.clear();
+    }
+
+    /**
+     * @return Exponential backoff counter.
+     */
+    public long throttleParkTime() {
+        return speedMarkAndAvgParkTime.getAverage();
+    }
+
+    /**
+     * @return Target (maximum) dirty pages ratio, after which throttling will 
start.
+     */
+    public double getTargetDirtyRatio() {
+        return targetDirtyRatio;
+    }
+
+    /**
+     * @return Current dirty pages ratio.
+     */
+    public double getCurrDirtyRatio() {
+        double ratio = currDirtyRatio;
+
+        if (ratio >= 0)
+            return ratio;
+
+        return pageMemory.getDirtyPagesRatio();
+    }
+
+    /**
+     * @return  Speed of marking pages dirty. Value from past 750-1000 millis 
only. Pages/second.
+     */
+    public long getMarkDirtySpeed() {
+        return speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime());
+    }
+
+    /**
+     * @return Speed average checkpoint write speed. Current and 3 past 
checkpoints used. Pages/second.
+     */
+    public long getCpWriteSpeed() {
+        return speedCpWrite.getSpeedOpsPerSecReadOnly();
+    }
+
+    /**
+     * @return Returns {@link #speedForMarkAll}.
+     */
+    public long getLastEstimatedSpeedForMarkAll() {
+        return speedForMarkAll;
+    }
+
+    /**
+     * Measurement shows how much throttling time is involved into average 
marking time.
+     * @return metric started from 0.0 and showing how much throttling is 
involved into current marking process.
+     */
+    public double throttleWeight() {
+        long speed = 
speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime());
+
+        if (speed <= 0)
+            return 0;
+
+        long timeForOnePage = calcDelayTime(speed, threadIds.size(), 1);
+
+        if (timeForOnePage == 0)
+            return 0;
+
+        return 1.0 * throttleParkTime() / timeForOnePage;
+    }
+
+    /**
+     * Throttling mode for page.
+     */
+    private enum ThrottleMode {
+        /** No delay is applied. */
+        NO,
+
+        /** Limited, time is based on target speed. */
+        LIMITED,
+
+        /** Exponential. */
+        EXPONENTIAL
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index a890442..9206935 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -24,7 +24,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
  * Throttles threads that generate dirty pages during ongoing checkpoint.
  * Designed to avoid zero dropdowns that can happen if checkpoint buffer is 
overflowed.
  */
-public class PagesWriteThrottle {
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
     /** Page memory. */
     private final PageMemoryImpl pageMemory;
 
@@ -48,11 +48,8 @@ public class PagesWriteThrottle {
         this.dbSharedMgr = dbSharedMgr;
     }
 
-    /**
-     * Callback to apply throttling delay.
-     * @param isInCheckpoint flag indicating if checkpoint is running.
-     */
-    public void onMarkDirty(boolean isInCheckpoint) {
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
         assert dbSharedMgr.checkpointLockIsHeldByThread();
 
         AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
@@ -62,7 +59,7 @@ public class PagesWriteThrottle {
 
         boolean shouldThrottle = false;
 
-        if (isInCheckpoint) {
+        if (isPageInCheckpoint) {
             int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 
2 / 3;
 
             shouldThrottle = pageMemory.checkpointBufferPagesCount() > 
checkpointBufLimit;
@@ -96,10 +93,12 @@ public class PagesWriteThrottle {
             exponentialBackoffCntr.set(0);
     }
 
-    /**
-     *
-     */
-    public void onFinishCheckpoint() {
+    /** {@inheritDoc} */
+    @Override public void onBeginCheckpoint() {
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onFinishCheckpoint() {
         exponentialBackoffCntr.set(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
new file mode 100644
index 0000000..adeaa3d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+/**
+ * Throttling policy, encapsulates logic of delaying write operations.
+ */
+public interface PagesWriteThrottlePolicy {
+    /**
+     * Callback to apply throttling delay.
+     * @param isPageInCheckpoint flag indicating if current page is in scope 
of current checkpoint.
+     */
+    void onMarkDirty(boolean isPageInCheckpoint);
+
+    /**
+     * Callback to notify throttling policy checkpoint was started.
+     */
+    void onBeginCheckpoint();
+
+    /**
+     * Callback to notify throttling policy checkpoint was finished.
+     */
+    void onFinishCheckpoint();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
index 782949f..c17f6cb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
@@ -120,7 +120,7 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest 
extends GridCommonAbstract
 
             boolean checkpointWithLowNumOfPagesFound = false;
 
-            for (int i = 0; i < 10; i++) {
+            for (int i = 0; i < 20; i++) {
                 Random random = new Random();
                 //touch some entry
                 int d = random.nextInt(PARTS) + PARTS;
@@ -134,18 +134,29 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest 
extends GridCommonAbstract
                 if (log.isInfoEnabled())
                     log.info("Put to cache [" + fullname + "] value " + d);
 
-                final int timeout = 5000;
+                long start = System.nanoTime();
                 try {
-                    db.wakeupForCheckpoint("").get(timeout, 
TimeUnit.MILLISECONDS);
+                    final int cpTimeout = 25000;
+
+                    db.wakeupForCheckpoint("").get(cpTimeout, 
TimeUnit.MILLISECONDS);
                 }
-                catch (IgniteFutureTimeoutCheckedException e) {
+                catch (IgniteFutureTimeoutCheckedException ignored) {
+                    long msPassed = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+
+                    log.error("Timeout during waiting for checkpoint to 
start:" +
+                        " [" + msPassed + "] but checkpoint is not running");
+
                     continue;
                 }
 
+                final int timeout = 5000;
                 int currCpPages = 
waitForCurrentCheckpointPagesCounterUpdated(db, timeout);
 
-                if (currCpPages < 0)
+                if (currCpPages < 0) {
+                    log.error("Timeout during waiting for checkpoint counter 
to be updated");
+
                     continue;
+                }
 
                 pageCntObserved.add(currCpPages);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
new file mode 100644
index 0000000..c49f08e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import junit.framework.TestCase;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
+/**
+ * Sandbox test to measure progress of grid write operations. If no progress 
occur during period of time, then thread
+ * dumps are generated.
+ */
+public class IgniteMassLoadSandboxTest extends GridCommonAbstractTest {
+    /** Cache name. Random to cover external stores possible problems. */
+    public static final String CACHE_NAME = "partitioned" + new 
Random().nextInt(10000000);
+
+    /** Object size - minimal size of object to be placed in cache. */
+    private static final int OBJECT_SIZE = 40000;
+
+    /** Records count to continuous put into cache. */
+    private static final int CONTINUOUS_PUT_RECS_CNT = 300_000;
+
+    /** Put thread: client threads naming for put operation. */
+    private static final String PUT_THREAD = "put-thread";
+
+    /** Get thread: client threadsd naming for verify operation. */
+    private static final String GET_THREAD = "get-thread";
+
+    /** Option to enabled storage verification after test. */
+    private static final boolean VERIFY_STORAGE = false;
+
+    /**
+     * Set WAL archive and work folders to same value.  Activates 'No 
Archiver' mode.
+     * See {@link FileWriteAheadLogManager#isArchiverEnabled()}.
+     */
+    private boolean setWalArchAndWorkToSameVal;
+
+    /** Option for test run: WAL segments size in bytes. */
+    private int walSegmentSize = 64 * 1024 * 1024;
+
+    /** Option for test run: Custom WAL mode. */
+    private WALMode customWalMode;
+
+    /** Option for test run: Checkpoint frequency. */
+    private int checkpointFrequency = 40 * 1000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, HugeIndexedObject> ccfg = new 
CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 1024));
+        ccfg.setIndexedTypes(Integer.class, HugeIndexedObject.class);
+        ccfg.setName(CACHE_NAME);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        DataRegionConfiguration regCfg = new DataRegionConfiguration()
+            .setName("dfltMemPlc")
+            .setMetricsEnabled(true)
+            .setMaxSize(2 * 1024L * 1024 * 1024)
+            .setPersistenceEnabled(true);
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+
+        dsCfg.setDefaultDataRegionConfiguration(regCfg)
+            .setPageSize(4 * 1024)
+            .setWriteThrottlingEnabled(true)
+            .setCheckpointFrequency(checkpointFrequency);
+
+        final String workDir = U.defaultWorkDirectory();
+        final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false);
+        final File wal = new File(db, "wal");
+        if (setWalArchAndWorkToSameVal) {
+            final String walAbsPath = wal.getAbsolutePath();
+
+            dsCfg.setWalPath(walAbsPath);
+
+            dsCfg.setWalArchivePath(walAbsPath);
+        }
+        else {
+            dsCfg.setWalPath(wal.getAbsolutePath());
+
+            dsCfg.setWalArchivePath(new File(wal, 
"archive").getAbsolutePath());
+        }
+
+        dsCfg.setWalMode(customWalMode != null ? customWalMode : 
WALMode.LOG_ONLY)
+            .setWalHistorySize(1)
+            .setWalSegments(10);
+
+        if (walSegmentSize != 0)
+            dsCfg.setWalSegmentSize(walSegmentSize);
+
+        cfg.setDataStorageConfiguration(dsCfg);
+
+        cfg.setBinaryConfiguration(new 
BinaryConfiguration().setCompactFooter(false));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
"db", false));
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
"temp", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Runs multithreaded put scenario (no data streamer). Load is generated 
to page store and to WAL.
+     * @throws Exception if failed.
+     */
+    public void testContinuousPutMultithreaded() throws Exception {
+        try {
+            // 
System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_PARALLEL, "true");
+            // 
System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, 
"true");
+            
System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, 
"false");
+            
System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED,
 "speed");
+
+            setWalArchAndWorkToSameVal = true;
+
+            customWalMode = WALMode.BACKGROUND;
+
+            final IgniteEx ignite = startGrid(1);
+
+            ignite.active(true);
+
+            final IgniteCache<Object, HugeIndexedObject> cache = 
ignite.cache(CACHE_NAME);
+            final int threads = Runtime.getRuntime().availableProcessors();
+
+            final int recsPerThread = CONTINUOUS_PUT_RECS_CNT / threads;
+
+            final Collection<Callable<?>> tasks = new ArrayList<>();
+
+            final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, 
"put", PUT_THREAD);
+
+            for (int j = 0; j < threads; j++) {
+                final int finalJ = j;
+
+                tasks.add(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        for (int i = finalJ * recsPerThread; i < ((finalJ + 1) 
* recsPerThread); i++) {
+                            HugeIndexedObject v = new HugeIndexedObject(i);
+                            cache.put(i, v);
+                            watchdog.reportProgress(1);
+                        }
+                        return null;
+                    }
+                });
+            }
+
+            watchdog.start();
+            GridTestUtils.runMultiThreaded(tasks, PUT_THREAD);
+
+            watchdog.stopping();
+            stopGrid(1);
+
+            watchdog.stop();
+
+            if (VERIFY_STORAGE)
+                runVerification(threads, recsPerThread);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Runs multithreaded put scenario (no data streamer). Load is generated 
to page store and to WAL.
+     * @throws Exception if failed.
+     */
+    public void testDataStreamerContinuousPutMultithreaded() throws Exception {
+        try {
+            // 
System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_PARALLEL, "true");
+            // 
System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, 
"true");
+            
System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, 
"false");
+            
System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED,
 "speed");
+
+            setWalArchAndWorkToSameVal = true;
+
+            customWalMode = WALMode.BACKGROUND;
+
+            final IgniteEx ignite = startGrid(1);
+
+            ignite.active(true);
+
+            final int threads = 1; Runtime.getRuntime().availableProcessors();
+
+            final int recsPerThread = CONTINUOUS_PUT_RECS_CNT / threads;
+
+            final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, 
"put", PUT_THREAD);
+
+            IgniteDataStreamer<Object, Object> streamer = 
ignite.dataStreamer(CACHE_NAME);
+
+            streamer.perNodeBufferSize(12);
+
+            final Collection<Callable<?>> tasks = new ArrayList<>();
+            for (int j = 0; j < threads; j++) {
+                final int finalJ = j;
+
+                tasks.add((Callable<Void>)() -> {
+                    for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * 
recsPerThread); i++)
+                        streamer.addData(i, new HugeIndexedObject(i));
+
+                    return null;
+                });
+            }
+
+            final IgniteCache<Object, HugeIndexedObject> cache = 
ignite.cache(CACHE_NAME);
+            ScheduledExecutorService svcReport = 
Executors.newScheduledThreadPool(1);
+
+            AtomicInteger size = new AtomicInteger();
+            svcReport.scheduleAtFixedRate(
+                () -> {
+                    int newSize = cache.size();
+                    int oldSize = size.getAndSet(newSize);
+
+                    watchdog.reportProgress(newSize - oldSize);
+                },
+                250, 250, TimeUnit.MILLISECONDS);
+
+            watchdog.start();
+            GridTestUtils.runMultiThreaded(tasks, PUT_THREAD);
+            streamer.close();
+
+            watchdog.stopping();
+            stopGrid(1);
+
+            watchdog.stop();
+
+            ProgressWatchdog.stopPool(svcReport);
+
+            if (VERIFY_STORAGE)
+                runVerification(threads, recsPerThread);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+
+    /**
+     * Verifies data from storage.
+     *
+     * @param threads threads count.
+     * @param recsPerThread record per thread loaded.
+     * @throws Exception if failed
+     */
+    private void runVerification(int threads, final int recsPerThread) throws 
Exception {
+        final Ignite restartedIgnite = startGrid(1);
+
+        restartedIgnite.active(true);
+
+        final IgniteCache<Integer, HugeIndexedObject> restartedCache = 
restartedIgnite.cache(CACHE_NAME);
+
+        final ProgressWatchdog watchdog2 = new 
ProgressWatchdog(restartedIgnite, "get", GET_THREAD);
+
+        final Collection<Callable<?>> tasksR = new ArrayList<>();
+        tasksR.clear();
+        for (int j = 0; j < threads; j++) {
+            final int finalJ = j;
+            tasksR.add(new Callable<Void>() {
+                @Override public Void call() {
+                    for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * 
recsPerThread); i++) {
+                        HugeIndexedObject obj = restartedCache.get(i);
+                        int actVal = obj.iVal;
+                        TestCase.assertEquals(i, actVal);
+                        watchdog2.reportProgress(1);
+                    }
+                    return null;
+                }
+            });
+        }
+
+        watchdog2.start();
+        GridTestUtils.runMultiThreaded(tasksR, GET_THREAD);
+        watchdog2.stop();
+    }
+
+    /**
+     * @param threads Threads count.
+     * @param recsPerThread initial records per thread.
+     * @param restartedCache cache to obtain data from.
+     */
+    private void verifyByChunk(int threads, int recsPerThread, Cache<Integer, 
HugeIndexedObject> restartedCache) {
+        int verifyChunk = 100;
+
+        int totalRecsToVerify = recsPerThread * threads;
+        int chunks = totalRecsToVerify / verifyChunk;
+
+        for (int c = 0; c < chunks; c++) {
+            Set<Integer> keys = new TreeSet<>();
+
+            for (int i = 0; i < verifyChunk; i++)
+                keys.add(i + c * verifyChunk);
+
+            Map<Integer, HugeIndexedObject> values = 
restartedCache.getAll(keys);
+
+            for (Map.Entry<Integer, HugeIndexedObject> next : 
values.entrySet()) {
+                Integer key = next.getKey();
+
+                int actVal = values.get(next.getKey()).iVal;
+                int i = key;
+                TestCase.assertEquals(i, actVal);
+
+                if (i % 1000 == 0)
+                    X.println(" >> Verified: " + i);
+            }
+
+        }
+    }
+
+    /**
+     * @param id entry id.
+     * @return {@code True} if need to keep entry in DB and checkpoint it. 
Most of entries not required.
+     */
+    private static boolean keepInDb(int id) {
+        return id % 1777 == 0;
+    }
+
+    /**
+     * Runs multithreaded put-remove scenario (no data streamer). Load is 
generated to WAL log mostly.
+     * Most of entries generated will be removed before first checkpoint.
+     *
+     * @throws Exception if failed.
+     */
+    public void testPutRemoveMultithreaded() throws Exception {
+        setWalArchAndWorkToSameVal = false;
+        customWalMode = WALMode.LOG_ONLY;
+
+        try {
+            final IgniteEx ignite = startGrid(1);
+
+            ignite.active(true);
+            checkpointFrequency = 20 * 1000;
+            final IgniteCache<Object, HugeIndexedObject> cache = 
ignite.cache(CACHE_NAME);
+            int totalRecs = 400_000;
+            final int threads = 10;
+            final int recsPerThread = totalRecs / threads;
+            final Collection<Callable<?>> tasks = new ArrayList<>();
+            final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, 
"put", PUT_THREAD);
+
+            for (int j = 0; j < threads; j++) {
+                final int finalJ = j;
+
+                tasks.add(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        final Collection<Integer> toRmvLaterList = new 
ArrayList<>();
+
+                        for (int id = finalJ * recsPerThread; id < ((finalJ + 
1) * recsPerThread); id++) {
+                            HugeIndexedObject v = new HugeIndexedObject(id);
+
+                            cache.put(id, v);
+                            toRmvLaterList.add(id);
+                            watchdog.reportProgress(1);
+
+                            if (toRmvLaterList.size() > 100) {
+                                for (Integer toRemoveId : toRmvLaterList) {
+                                    if (keepInDb(toRemoveId))
+                                        continue;
+
+                                    boolean rmv = cache.remove(toRemoveId);
+                                    assert rmv : "Expected to remove object 
from cache " + toRemoveId;
+                                }
+                                toRmvLaterList.clear();
+                            }
+                        }
+                        return null;
+                    }
+                });
+            }
+
+            watchdog.start();
+            GridTestUtils.runMultiThreaded(tasks, PUT_THREAD);
+            watchdog.stop();
+            stopGrid(1);
+
+            final Ignite restartedIgnite = startGrid(1);
+
+            restartedIgnite.active(true);
+
+            final IgniteCache<Object, HugeIndexedObject> restartedCache = 
restartedIgnite.cache(CACHE_NAME);
+
+            for (int i = 0; i < recsPerThread * threads; i++) {
+                if (keepInDb(i)) {
+                    final HugeIndexedObject obj = restartedCache.get(i);
+
+                    TestCase.assertNotNull(obj);
+                    TestCase.assertEquals(i, obj.iVal);
+                }
+
+                if (i % 1000 == 0)
+                    X.print(" V: " + i);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.MINUTES.toMillis(20);
+    }
+
+    /** Object with additional 40 000 bytes of payload */
+    public static class HugeIndexedObject   {
+        /** Data. */
+        private byte[] data;
+        /** */
+        @QuerySqlField(index = true)
+        private int iVal;
+
+        /**
+         * @param iVal Integer value.
+         */
+        private HugeIndexedObject(int iVal) {
+            this.iVal = iVal;
+
+            int sz = OBJECT_SIZE;
+
+            data = new byte[sz];
+            for (int i = 0; i < sz; i++)
+                data[i] = (byte)('A' + (i % 10));
+        }
+
+        /**
+         * @return Data.
+         */
+        public byte[] data() {
+            return data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof HugeIndexedObject))
+                return false;
+
+            HugeIndexedObject that = (HugeIndexedObject)o;
+
+            return iVal == that.iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(HugeIndexedObject.class, this);
+        }
+    }
+}
\ No newline at end of file

Reply via email to