http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java
new file mode 100644
index 0000000..55ca2a9
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.ignite.DataRegionMetrics;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteSpeedBasedThrottle;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Process watchdog for ignite instance. Detects gaps in progress, prints and 
saves summary of execution metrics to
+ * file. Client implementation should use {@link 
ProgressWatchdog#reportProgress(int)} to show how much data was
+ * processed at client size. Long absence of this calls will cause thread 
dumps to be generated.
+ */
+class ProgressWatchdog {
+    public static final int CHECK_PERIOD_MSEC = 1000;
+    /** Progress counter, Overall records processed. */
+    private final LongAdder overallRecordsProcessed = new LongAdder();
+
+    /** Metrics log Txt file writer. */
+    private final FileWriter txtWriter;
+
+    /** Client threads name, included into thread dumps. */
+    private String clientThreadsName;
+
+    /** Service for scheduled execution of watchdog. */
+    private ScheduledExecutorService svc = Executors.newScheduledThreadPool(1);
+
+    /** Operation name for messages and log. */
+    private final String operation;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** Stopping flag, indicates ignite close was called but stop is not 
finished. */
+    private volatile boolean stopping;
+
+    /** Value of records count at previous tick, see {@link 
#overallRecordsProcessed} */
+    private final AtomicLong prevRecordsCnt = new AtomicLong();
+
+    /** Milliseconds elapsed at previous tick. */
+    private final AtomicLong prevMsElapsed = new AtomicLong();
+
+    /** Checkpoint written pages at previous tick. */
+    private final AtomicLong prevCpWrittenPages = new AtomicLong();
+
+    /** Checkpoint fsync()'ed pages at previous tick. */
+    private final AtomicLong prevCpSyncedPages = new AtomicLong();
+
+    /** WAL pointer at previous tick reference. */
+    private final AtomicReference<FileWALPointer> prevWalPtrRef = new 
AtomicReference<>();
+
+    /** Milliseconds at start of watchdog execution. */
+    private long msStart;
+
+    /**
+     * Creates watchdog.
+     *
+     * @param ignite Ignite.
+     * @param operation Operation name for log.
+     * @param clientThreadsName Client threads name.
+     */
+    ProgressWatchdog(Ignite ignite, String operation,
+        String clientThreadsName) throws IgniteCheckedException, IOException {
+        this.ignite = ignite;
+        this.operation = operation;
+        txtWriter = new FileWriter(new File(getTempDirFile(), "watchdog-" + 
operation + ".txt"));
+        this.clientThreadsName = clientThreadsName;
+        line("sec",
+            "cur." + operation + "/sec",
+            "WAL speed, MB/s.",
+            "cp. speed, MB/sec",
+            "cp. sync., MB/sec",
+            "WAL work seg.",
+            "Throttle part time",
+            "curDirtyRatio",
+            "targetDirtyRatio",
+            "throttleWeigth",
+            "markDirtySpeed",
+            "cpWriteSpeed",
+            "estMarkAllSpeed",
+            "avg." + operation + "/sec",
+            "dirtyPages",
+            "cpWrittenPages",
+            "cpSyncedPages",
+            "cpEvictedPages",
+            "WAL idx",
+            "Arch. idx",
+            "WAL Archive seg.");
+    }
+
+    /**
+     * @return temp dir to place watchdog report.
+     * @throws IgniteCheckedException if failed.
+     */
+    @NotNull private static File getTempDirFile() throws 
IgniteCheckedException {
+        File tempDir = new File(U.defaultWorkDirectory(), "temp");
+
+        if (!tempDir.exists())
+            tempDir.mkdirs();
+
+        return tempDir;
+    }
+
+    /**
+     * Generates limited thread dump with only threads involved into 
persistence.
+     *
+     * @return string to log.
+     */
+    private String generateThreadDump() {
+        final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+        int depth = 100;
+        final ThreadInfo[] threadInfos = 
threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), depth);
+        final StringBuilder dump = new StringBuilder();
+
+        for (ThreadInfo threadInfo : threadInfos) {
+            String name = threadInfo.getThreadName();
+
+            if (name.contains("checkpoint-runner")
+                || name.contains("db-checkpoint-thread")
+                || name.contains("wal-file-archiver")
+                || name.contains("data-streamer")
+                || (clientThreadsName!=null && 
name.contains(clientThreadsName))) {
+                String str = threadInfo.toString();
+
+                if (name.contains("db-checkpoint-thread")) {
+                    dump.append(str);
+                    dump.append("(Full stacktrace)");
+
+                    StackTraceElement[] stackTrace = 
threadInfo.getStackTrace();
+                    int i = 0;
+
+                    for (; i < stackTrace.length && i < depth; i++) {
+                        StackTraceElement ste = stackTrace[i];
+                        dump.append("\tat ").append(ste.toString());
+                        dump.append('\n');
+                    }
+
+                    if (i < stackTrace.length) {
+                        dump.append("\t...");
+                        dump.append('\n');
+                    }
+
+                    dump.append('\n');
+                }
+                else
+                    dump.append(str);
+            }
+            else {
+                String s = threadInfo.toString();
+
+                if (s.contains(FileWriteAheadLogManager.class.getSimpleName())
+                    || s.contains(FilePageStoreManager.class.getSimpleName()))
+                    dump.append(s);
+            }
+        }
+        return dump.toString();
+    }
+
+    /**
+     * Adds line to txt log {@link #txtWriter}.
+     * @param parms values to log
+     */
+    private void line(Object... parms) {
+        try {
+            for (int i = 0; i < parms.length; i++) {
+                Object parm = parms[i];
+                String delim = (i < parms.length - 1) ? "\t" : "\n";
+
+                txtWriter.write(parm + delim);
+            }
+
+            txtWriter.flush();
+        }
+        catch (IOException ignored) {
+        }
+    }
+
+    /**
+     * Starts watchdog execution.
+     */
+    public void start() {
+        msStart = U.currentTimeMillis();
+        prevMsElapsed.set(0);
+        prevRecordsCnt.set(0);
+        prevCpWrittenPages.set(0);
+        prevCpSyncedPages.set(0);
+        prevWalPtrRef.set(null);
+
+        svc.scheduleAtFixedRate(
+            this::tick,
+            CHECK_PERIOD_MSEC, CHECK_PERIOD_MSEC, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Regular method printing statistics to out and to log. Checks gaps in 
progress.
+     */
+    private void tick() {
+        long elapsedMs = U.currentTimeMillis() - msStart;
+        final long totalCnt = overallRecordsProcessed.longValue();
+        long elapsedMsFromPrevTick = elapsedMs - 
prevMsElapsed.getAndSet(elapsedMs);
+        if (elapsedMsFromPrevTick == 0)
+            return;
+
+        final long currPutPerSec = ((totalCnt - 
prevRecordsCnt.getAndSet(totalCnt)) * 1000) / elapsedMsFromPrevTick;
+        final long averagePutPerSec = totalCnt * 1000 / elapsedMs;
+        boolean slowProgress = currPutPerSec < averagePutPerSec / 10 && 
!stopping;
+        final String fileNameWithDump = slowProgress ? reactNoProgress() : "";
+
+        DataStorageConfiguration dsCfg = 
ignite.configuration().getDataStorageConfiguration();
+
+        String defRegName = 
dsCfg.getDefaultDataRegionConfiguration().getName();
+        long dirtyPages = -1;
+        for (DataRegionMetrics m : ignite.dataRegionMetrics())
+            if (m.getName().equals(defRegName))
+                dirtyPages = m.getDirtyPages();
+
+        GridCacheSharedContext<Object, Object> cacheSctx = null;
+        PageMemoryImpl pageMemory = null;
+        try {
+            cacheSctx = ((IgniteEx)ignite).context().cache().context();
+            pageMemory = 
(PageMemoryImpl)cacheSctx.database().dataRegion(defRegName).pageMemory();
+        }
+        catch (IgniteCheckedException e) {
+            e.printStackTrace();
+        }
+
+        long cpBufPages = 0;
+
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)(cacheSctx.database());
+        AtomicInteger wrPageCntr = db.writtenPagesCounter();
+        long cpWrittenPages = wrPageCntr == null ? 0 : wrPageCntr.get();
+
+        AtomicInteger syncedPagesCntr = db.syncedPagesCounter();
+        int cpSyncedPages = syncedPagesCntr == null ? 0 : 
syncedPagesCntr.get();
+
+        AtomicInteger evictedPagesCntr = db.evictedPagesCntr();
+        int cpEvictedPages = evictedPagesCntr == null ? 0 : 
evictedPagesCntr.get();
+
+        int pageSize = pageMemory == null ? 0 : pageMemory.pageSize();
+
+        String cpWriteSpeed = getMBytesPrintable(
+            detectDelta(elapsedMsFromPrevTick, cpWrittenPages, 
prevCpWrittenPages) * pageSize);
+
+        String cpSyncSpeed = getMBytesPrintable(
+            detectDelta(elapsedMsFromPrevTick, cpSyncedPages, 
prevCpSyncedPages) * pageSize);
+
+        String walSpeed = "";
+        long throttleParkTimeNanos = 0;
+        double curDirtyRatio = 0.0;
+        String targetDirtyRatioStr = "";
+        double closeToThrottle = 0.0;
+        long idx = -1;
+        long lastArchIdx = -1;
+        int walArchiveSegments = 0;
+        long walWorkSegments = 0;
+        long markDirtySpeed = 0;
+        long cpWriteSpeedInPages = 0;
+        long estWrAllSpeed = 0;
+
+        try {
+            if (pageMemory != null) {
+                cpBufPages = pageMemory.checkpointBufferPagesCount();
+
+                PagesWriteSpeedBasedThrottle throttle = U.field(pageMemory, 
"writeThrottle");
+
+                if (throttle != null) {
+                    curDirtyRatio = throttle.getCurrDirtyRatio();
+
+                    double targetDirtyRatio = throttle.getTargetDirtyRatio();
+
+                    targetDirtyRatioStr = targetDirtyRatio < 0 ? "" : 
formatDbl(targetDirtyRatio);
+
+                    closeToThrottle = throttle.throttleWeight();
+                    throttleParkTimeNanos = throttle.throttleParkTime();
+                    markDirtySpeed = throttle.getMarkDirtySpeed();
+                    cpWriteSpeedInPages = throttle.getCpWriteSpeed();
+                    estWrAllSpeed = throttle.getLastEstimatedSpeedForMarkAll();
+                    if (estWrAllSpeed > 99_999)
+                        estWrAllSpeed = 99_999;
+                }
+            }
+
+            FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)cacheSctx.wal();
+
+            idx = 0;
+            lastArchIdx = 0;
+
+            walArchiveSegments = wal.walArchiveSegments();
+            walWorkSegments = idx - lastArchIdx;
+
+            /* // uncomment when currentWritePointer is available
+             FileWALPointer ptr = wal.currentWritePointer();
+               FileWALPointer prevWalPtr = this.prevWalPtrRef.getAndSet(ptr);
+
+               if (prevWalPtr != null) {
+                   long idxDiff = ptr.index() - prevWalPtr.index();
+                   long offDiff = ptr.fileOffset() - prevWalPtr.fileOffset();
+                   long bytesDiff = idxDiff * maxWalSegmentSize + offDiff;
+
+                   long bytesPerSec = (bytesDiff * 1000) / 
elapsedMsFromPrevTick;
+
+                   walSpeed = getMBytesPrintable(bytesPerSec);
+               } else
+                   walSpeed = "0";
+             */
+
+            walSpeed = "0";
+        }
+        catch (Exception e) {
+            X.error(e.getClass().getSimpleName() + ":" + e.getMessage());
+        }
+
+        long elapsedSecs = elapsedMs / 1000;
+        X.println(" >> " +
+            operation +
+            " done: " + totalCnt + "/" + elapsedSecs + "s, " +
+            "Cur. " + operation + " " + currPutPerSec + " recs/sec " +
+            "cpWriteSpeed=" + cpWriteSpeed + " " +
+            "cpSyncSpeed=" + cpSyncSpeed + " " +
+            "walSpeed= " + walSpeed + " " +
+            "walWorkSeg.="+walWorkSegments + " " +
+            "markDirtySpeed=" + markDirtySpeed +" " +
+            "Avg. " + operation + " " + averagePutPerSec + " recs/sec, " +
+            "dirtyP=" + dirtyPages + ", " +
+            "cpWrittenP.=" + cpWrittenPages + ", " +
+            "cpBufP.=" + cpBufPages + " " +
+            "threshold=" + targetDirtyRatioStr + " " +
+            "walIdx=" + idx + " " +
+            "archWalIdx=" + lastArchIdx + " " +
+            "walArchiveSegments=" + walArchiveSegments + " " +
+            fileNameWithDump);
+
+        line(elapsedSecs,
+            currPutPerSec,
+            walSpeed,
+            cpWriteSpeed,
+            cpSyncSpeed,
+            walWorkSegments,
+            throttleParkTimeNanos,
+            formatDbl(curDirtyRatio),
+            targetDirtyRatioStr,
+            formatDbl(closeToThrottle),
+            markDirtySpeed,
+            cpWriteSpeedInPages,
+            estWrAllSpeed,
+            averagePutPerSec,
+            dirtyPages,
+            cpWrittenPages,
+            cpSyncedPages,
+            cpEvictedPages,
+            idx,
+            lastArchIdx,
+            walArchiveSegments
+        );
+    }
+
+    /**
+     * @param val value to log.
+     * @return formatted value for txt log.
+     */
+    private String formatDbl(double val) {
+        return String.format("%.2f", val).replace(",", ".");
+    }
+
+    /**
+     * Converts bytes counter to MegaBytes
+     * @param currBytesWritten bytes.
+     * @return string with megabytes as printable string.
+     */
+    private String getMBytesPrintable(long currBytesWritten) {
+        double cpMbPs = 1.0 * currBytesWritten / (1024 * 1024);
+
+        return formatDbl(cpMbPs);
+    }
+
+    /**
+     * @param elapsedMsFromPrevTick time from previous tick, millis.
+     * @param absVal current value
+     * @param cnt counter stores previous value.
+     * @return value change from previous tick.
+     */
+    private long detectDelta(long elapsedMsFromPrevTick, long absVal, 
AtomicLong cnt) {
+        long cpPagesChange = absVal - cnt.getAndSet(absVal);
+
+        if (cpPagesChange < 0)
+            cpPagesChange = 0;
+
+        return (cpPagesChange * 1000) / elapsedMsFromPrevTick;
+    }
+
+    /**
+     * @return file name with dump created.
+     */
+    private String reactNoProgress() {
+        try {
+            String threadDump = generateThreadDump();
+
+            long sec = TimeUnit.MILLISECONDS.toSeconds(U.currentTimeMillis() - 
msStart);
+
+            String fileName = "dumpAt" + sec + "second.txt";
+
+            if 
(threadDump.contains(IgniteCacheDatabaseSharedManager.class.getName() + 
".checkpointLock"))
+                fileName = "checkpoint_" + fileName;
+
+            fileName = operation + fileName;
+
+            try (FileWriter writer = new FileWriter(new File(getTempDirFile(), 
fileName))) {
+                writer.write(threadDump);
+            }
+
+            return fileName;
+        }
+        catch (IOException | IgniteCheckedException e) {
+            e.printStackTrace();
+        }
+        return "";
+    }
+
+    /**
+     * @param cnt counter of entries/operations processed since last call.
+     */
+    public void reportProgress(int cnt) {
+        overallRecordsProcessed.add(cnt);
+    }
+
+    /**
+     * Call this method to indicate client operation is done, and ignite is 
stopping.
+     */
+    public void stopping() {
+        stopping = true;
+    }
+
+    /**
+     * Stops watchdog threads.
+     */
+    public void stop() {
+        U.closeQuiet(txtWriter);
+
+        ScheduledExecutorService pool = this.svc;
+        stopPool(pool);
+    }
+
+    public static void stopPool(ExecutorService pool) {
+        pool.shutdown();
+        try {
+            pool.awaitTermination(10, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 13e811c..1b6fde3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -84,7 +84,7 @@ public class BPlusTreePageMemoryImplTest extends 
BPlusTreeSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
 
         mem.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index ce5deba..1180164 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -84,7 +84,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends 
BPlusTreeReuseSelfTest
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
 
         mem.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
new file mode 100644
index 0000000..054696c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteLogger;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.logger.NullLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class IgniteThrottlingUnitTest {
+    /** Logger. */
+    private IgniteLogger log = new NullLogger();
+
+    /** Page memory 2 g. */
+    private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class);
+
+    {
+        when(pageMemory2g.totalPages()).thenReturn((2L * 1024 * 1024 * 1024) / 
4096);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void breakInCaseTooFast() {
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        long time = throttle.getParkTime(0.67,
+            (362584 + 67064) / 2,
+            328787,
+            1,
+            60184,
+            23103);
+
+        assertTrue(time > 0);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void noBreakIfNotFastWrite() {
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        long time = throttle.getParkTime(0.47,
+            ((362584 + 67064) / 2),
+            328787,
+            1,
+            20103,
+            23103);
+
+        assertTrue(time == 0);
+    }
+
+    /**
+     * @throws InterruptedException if interrupted.
+     */
+    @Test
+    public void averageCalculation() throws InterruptedException {
+        IntervalBasedMeasurement measurement = new 
IntervalBasedMeasurement(100, 1);
+
+        for (int i = 0; i < 1000; i++)
+            measurement.addMeasurementForAverageCalculation(100);
+
+        assertEquals(100, measurement.getAverage());
+
+        Thread.sleep(220);
+
+        assertEquals(0, measurement.getAverage());
+
+        assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime()));
+    }
+
+    /**
+     * @throws InterruptedException if interrupted.
+     */
+    @Test
+    public void speedCalculation() throws InterruptedException {
+        IntervalBasedMeasurement measurement = new 
IntervalBasedMeasurement(100, 1);
+
+        for (int i = 0; i < 1000; i++)
+            measurement.setCounter(i, System.nanoTime());
+
+        long speed = measurement.getSpeedOpsPerSec(System.nanoTime());
+        System.out.println("speed measured " + speed);
+        assertTrue(speed > 1000);
+
+        Thread.sleep(230);
+
+        assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime()));
+    }
+
+    /**
+     * @throws InterruptedException if interrupted.
+     */
+    @Test
+    public void speedWithDelayCalculation() throws InterruptedException {
+        IntervalBasedMeasurement measurement = new 
IntervalBasedMeasurement(100, 1);
+
+        int runs = 10;
+        int nanosPark = 100;
+        int multiplier = 100000;
+        for (int i = 0; i < runs; i++) {
+            measurement.setCounter(i * multiplier, System.nanoTime());
+
+            LockSupport.parkNanos(nanosPark);
+        }
+
+        long speed = measurement.getSpeedOpsPerSec(System.nanoTime());
+
+        assertTrue(speed > 0);
+        long maxSpeed = (TimeUnit.SECONDS.toNanos(1) * multiplier * runs) / 
((long)(runs * nanosPark));
+        assertTrue(speed < maxSpeed);
+
+        Thread.sleep(200);
+
+        assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime()));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void beginOfCp() {
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        assertTrue(throttle.getParkTime(0.01, 100,400000,
+            1,
+            20103,
+            23103) == 0);
+
+        //mark speed 22413 for mark all remaining as dirty
+        long time = throttle.getParkTime(0.024, 100, 400000,
+            1,
+            24000,
+            23103);
+        assertTrue(time > 0);
+
+        assertTrue(throttle.getParkTime(0.01,
+            100,
+            400000,
+            1,
+            22412,
+            23103) == 0);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void enforceThrottleAtTheEndOfCp() {
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        long time1 = throttle.getParkTime(0.70, 300000, 400000,
+            1, 20200, 23000);
+        long time2 = throttle.getParkTime(0.71, 300000, 400000,
+            1, 20200, 23000);
+
+        assertTrue(time2 >= time1 * 2); // extra slowdown should be applied.
+
+        long time3 = throttle.getParkTime(0.73, 300000, 400000,
+            1, 20200, 23000);
+        long time4 = throttle.getParkTime(0.74, 300000, 400000,
+            1, 20200, 23000);
+
+        assertTrue(time3 > time2);
+        assertTrue(time4 > time3);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void tooMuchPagesMarkedDirty() {
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+       // 363308       350004  348976  10604
+        long time = throttle.getParkTime(0.75,
+            ((350004 + 348976) / 2),
+            350004-10604,
+            4,
+            279,
+            23933);
+
+        System.err.println(time);
+
+        assertTrue(time == 0);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void warningInCaseTooMuchThrottling() {
+        AtomicInteger warnings = new AtomicInteger(0);
+        IgniteLogger log = mock(IgniteLogger.class);
+
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+
+            System.out.println("log.info() called with arguments: " + 
Arrays.toString(args));
+
+            warnings.incrementAndGet();
+
+            return null;
+        }).when(log).info(anyString());
+
+        AtomicInteger written = new AtomicInteger();
+        GridCacheDatabaseSharedManager db = 
mock(GridCacheDatabaseSharedManager.class);
+        when(db.checkpointLockIsHeldByThread()).thenReturn(true);
+        when(db.writtenPagesCounter()).thenReturn(written);
+
+        PagesWriteSpeedBasedThrottle throttle = new 
PagesWriteSpeedBasedThrottle(pageMemory2g, db, log) {
+            @Override protected void doPark(long throttleParkTimeNs) {
+                //do nothing
+            }
+        };
+        throttle.onBeginCheckpoint();
+        written.set(200); //emulating some pages written
+
+        for (int i = 0; i < 100000; i++) {
+            //emulating high load on marking
+            throttle.onMarkDirty(false);
+
+            if (throttle.throttleWeight() > 
PagesWriteSpeedBasedThrottle.WARN_THRESHOLD)
+                break;
+        }
+
+        for (int i = 0; i < 1000; i++) {
+            //emulating additional page writes to be sure log message is 
generated
+
+            throttle.onMarkDirty(false);
+
+            if(warnings.get()>0)
+                break;
+        }
+
+        System.out.println(throttle.throttleWeight());
+
+        assertTrue(warnings.get() > 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
index effbe7e..04f3bd0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
@@ -99,7 +99,7 @@ public class IndexStoragePageMemoryImplTest extends 
IndexStorageSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 3d806b1..ed285c5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -90,7 +90,7 @@ public class PageMemoryImplNoLoadTest extends 
PageMemoryNoLoadSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 5c91c59..1369f28 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -122,7 +122,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
                 }
             },
             new 
DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
 
         mem.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index a7c549d..f957aec 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -19,7 +19,6 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import 
org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistence;
-import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicCacheTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
new file mode 100644
index 0000000..a2bfeb3
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.testsuites;
+
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.IgniteThrottlingUnitTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ *
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    IgniteThrottlingUnitTest.class
+})
+public class IgnitePdsUnitTestSuite {
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
index 22bd610..0546459 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
@@ -30,12 +30,16 @@ import junit.framework.TestSuite;
  * This suite is not included into main build
  */
 public class IgniteReproducingSuite extends TestSuite {
+    /**
+     * @return suite with test(s) for reproduction some problem.
+     * @throws Exception if failed.
+     */
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Ignite Issue Reproducing Test Suite");
 
         //uncomment to add some test
         //for (int i = 0; i < 100; i++)
-        //    suite.addTestSuite();
+        //    
suite.addTestSuite(IgniteCheckpointDirtyPagesForLowLoadTest.class);
 
         return suite;
     }

Reply via email to