http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java new file mode 100644 index 0000000..55ca2a9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import org.apache.ignite.DataRegionMetrics; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteSpeedBasedThrottle; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; + +/** + * Process watchdog for ignite instance. Detects gaps in progress, prints and saves summary of execution metrics to + * file. Client implementation should use {@link ProgressWatchdog#reportProgress(int)} to show how much data was + * processed at client size. Long absence of this calls will cause thread dumps to be generated. + */ +class ProgressWatchdog { + public static final int CHECK_PERIOD_MSEC = 1000; + /** Progress counter, Overall records processed. */ + private final LongAdder overallRecordsProcessed = new LongAdder(); + + /** Metrics log Txt file writer. */ + private final FileWriter txtWriter; + + /** Client threads name, included into thread dumps. */ + private String clientThreadsName; + + /** Service for scheduled execution of watchdog. */ + private ScheduledExecutorService svc = Executors.newScheduledThreadPool(1); + + /** Operation name for messages and log. */ + private final String operation; + + /** Ignite instance. */ + private Ignite ignite; + + /** Stopping flag, indicates ignite close was called but stop is not finished. */ + private volatile boolean stopping; + + /** Value of records count at previous tick, see {@link #overallRecordsProcessed} */ + private final AtomicLong prevRecordsCnt = new AtomicLong(); + + /** Milliseconds elapsed at previous tick. */ + private final AtomicLong prevMsElapsed = new AtomicLong(); + + /** Checkpoint written pages at previous tick. */ + private final AtomicLong prevCpWrittenPages = new AtomicLong(); + + /** Checkpoint fsync()'ed pages at previous tick. */ + private final AtomicLong prevCpSyncedPages = new AtomicLong(); + + /** WAL pointer at previous tick reference. */ + private final AtomicReference<FileWALPointer> prevWalPtrRef = new AtomicReference<>(); + + /** Milliseconds at start of watchdog execution. */ + private long msStart; + + /** + * Creates watchdog. + * + * @param ignite Ignite. + * @param operation Operation name for log. + * @param clientThreadsName Client threads name. + */ + ProgressWatchdog(Ignite ignite, String operation, + String clientThreadsName) throws IgniteCheckedException, IOException { + this.ignite = ignite; + this.operation = operation; + txtWriter = new FileWriter(new File(getTempDirFile(), "watchdog-" + operation + ".txt")); + this.clientThreadsName = clientThreadsName; + line("sec", + "cur." + operation + "/sec", + "WAL speed, MB/s.", + "cp. speed, MB/sec", + "cp. sync., MB/sec", + "WAL work seg.", + "Throttle part time", + "curDirtyRatio", + "targetDirtyRatio", + "throttleWeigth", + "markDirtySpeed", + "cpWriteSpeed", + "estMarkAllSpeed", + "avg." + operation + "/sec", + "dirtyPages", + "cpWrittenPages", + "cpSyncedPages", + "cpEvictedPages", + "WAL idx", + "Arch. idx", + "WAL Archive seg."); + } + + /** + * @return temp dir to place watchdog report. + * @throws IgniteCheckedException if failed. + */ + @NotNull private static File getTempDirFile() throws IgniteCheckedException { + File tempDir = new File(U.defaultWorkDirectory(), "temp"); + + if (!tempDir.exists()) + tempDir.mkdirs(); + + return tempDir; + } + + /** + * Generates limited thread dump with only threads involved into persistence. + * + * @return string to log. + */ + private String generateThreadDump() { + final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + int depth = 100; + final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), depth); + final StringBuilder dump = new StringBuilder(); + + for (ThreadInfo threadInfo : threadInfos) { + String name = threadInfo.getThreadName(); + + if (name.contains("checkpoint-runner") + || name.contains("db-checkpoint-thread") + || name.contains("wal-file-archiver") + || name.contains("data-streamer") + || (clientThreadsName!=null && name.contains(clientThreadsName))) { + String str = threadInfo.toString(); + + if (name.contains("db-checkpoint-thread")) { + dump.append(str); + dump.append("(Full stacktrace)"); + + StackTraceElement[] stackTrace = threadInfo.getStackTrace(); + int i = 0; + + for (; i < stackTrace.length && i < depth; i++) { + StackTraceElement ste = stackTrace[i]; + dump.append("\tat ").append(ste.toString()); + dump.append('\n'); + } + + if (i < stackTrace.length) { + dump.append("\t..."); + dump.append('\n'); + } + + dump.append('\n'); + } + else + dump.append(str); + } + else { + String s = threadInfo.toString(); + + if (s.contains(FileWriteAheadLogManager.class.getSimpleName()) + || s.contains(FilePageStoreManager.class.getSimpleName())) + dump.append(s); + } + } + return dump.toString(); + } + + /** + * Adds line to txt log {@link #txtWriter}. + * @param parms values to log + */ + private void line(Object... parms) { + try { + for (int i = 0; i < parms.length; i++) { + Object parm = parms[i]; + String delim = (i < parms.length - 1) ? "\t" : "\n"; + + txtWriter.write(parm + delim); + } + + txtWriter.flush(); + } + catch (IOException ignored) { + } + } + + /** + * Starts watchdog execution. + */ + public void start() { + msStart = U.currentTimeMillis(); + prevMsElapsed.set(0); + prevRecordsCnt.set(0); + prevCpWrittenPages.set(0); + prevCpSyncedPages.set(0); + prevWalPtrRef.set(null); + + svc.scheduleAtFixedRate( + this::tick, + CHECK_PERIOD_MSEC, CHECK_PERIOD_MSEC, TimeUnit.MILLISECONDS); + } + + /** + * Regular method printing statistics to out and to log. Checks gaps in progress. + */ + private void tick() { + long elapsedMs = U.currentTimeMillis() - msStart; + final long totalCnt = overallRecordsProcessed.longValue(); + long elapsedMsFromPrevTick = elapsedMs - prevMsElapsed.getAndSet(elapsedMs); + if (elapsedMsFromPrevTick == 0) + return; + + final long currPutPerSec = ((totalCnt - prevRecordsCnt.getAndSet(totalCnt)) * 1000) / elapsedMsFromPrevTick; + final long averagePutPerSec = totalCnt * 1000 / elapsedMs; + boolean slowProgress = currPutPerSec < averagePutPerSec / 10 && !stopping; + final String fileNameWithDump = slowProgress ? reactNoProgress() : ""; + + DataStorageConfiguration dsCfg = ignite.configuration().getDataStorageConfiguration(); + + String defRegName = dsCfg.getDefaultDataRegionConfiguration().getName(); + long dirtyPages = -1; + for (DataRegionMetrics m : ignite.dataRegionMetrics()) + if (m.getName().equals(defRegName)) + dirtyPages = m.getDirtyPages(); + + GridCacheSharedContext<Object, Object> cacheSctx = null; + PageMemoryImpl pageMemory = null; + try { + cacheSctx = ((IgniteEx)ignite).context().cache().context(); + pageMemory = (PageMemoryImpl)cacheSctx.database().dataRegion(defRegName).pageMemory(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + long cpBufPages = 0; + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)(cacheSctx.database()); + AtomicInteger wrPageCntr = db.writtenPagesCounter(); + long cpWrittenPages = wrPageCntr == null ? 0 : wrPageCntr.get(); + + AtomicInteger syncedPagesCntr = db.syncedPagesCounter(); + int cpSyncedPages = syncedPagesCntr == null ? 0 : syncedPagesCntr.get(); + + AtomicInteger evictedPagesCntr = db.evictedPagesCntr(); + int cpEvictedPages = evictedPagesCntr == null ? 0 : evictedPagesCntr.get(); + + int pageSize = pageMemory == null ? 0 : pageMemory.pageSize(); + + String cpWriteSpeed = getMBytesPrintable( + detectDelta(elapsedMsFromPrevTick, cpWrittenPages, prevCpWrittenPages) * pageSize); + + String cpSyncSpeed = getMBytesPrintable( + detectDelta(elapsedMsFromPrevTick, cpSyncedPages, prevCpSyncedPages) * pageSize); + + String walSpeed = ""; + long throttleParkTimeNanos = 0; + double curDirtyRatio = 0.0; + String targetDirtyRatioStr = ""; + double closeToThrottle = 0.0; + long idx = -1; + long lastArchIdx = -1; + int walArchiveSegments = 0; + long walWorkSegments = 0; + long markDirtySpeed = 0; + long cpWriteSpeedInPages = 0; + long estWrAllSpeed = 0; + + try { + if (pageMemory != null) { + cpBufPages = pageMemory.checkpointBufferPagesCount(); + + PagesWriteSpeedBasedThrottle throttle = U.field(pageMemory, "writeThrottle"); + + if (throttle != null) { + curDirtyRatio = throttle.getCurrDirtyRatio(); + + double targetDirtyRatio = throttle.getTargetDirtyRatio(); + + targetDirtyRatioStr = targetDirtyRatio < 0 ? "" : formatDbl(targetDirtyRatio); + + closeToThrottle = throttle.throttleWeight(); + throttleParkTimeNanos = throttle.throttleParkTime(); + markDirtySpeed = throttle.getMarkDirtySpeed(); + cpWriteSpeedInPages = throttle.getCpWriteSpeed(); + estWrAllSpeed = throttle.getLastEstimatedSpeedForMarkAll(); + if (estWrAllSpeed > 99_999) + estWrAllSpeed = 99_999; + } + } + + FileWriteAheadLogManager wal = (FileWriteAheadLogManager)cacheSctx.wal(); + + idx = 0; + lastArchIdx = 0; + + walArchiveSegments = wal.walArchiveSegments(); + walWorkSegments = idx - lastArchIdx; + + /* // uncomment when currentWritePointer is available + FileWALPointer ptr = wal.currentWritePointer(); + FileWALPointer prevWalPtr = this.prevWalPtrRef.getAndSet(ptr); + + if (prevWalPtr != null) { + long idxDiff = ptr.index() - prevWalPtr.index(); + long offDiff = ptr.fileOffset() - prevWalPtr.fileOffset(); + long bytesDiff = idxDiff * maxWalSegmentSize + offDiff; + + long bytesPerSec = (bytesDiff * 1000) / elapsedMsFromPrevTick; + + walSpeed = getMBytesPrintable(bytesPerSec); + } else + walSpeed = "0"; + */ + + walSpeed = "0"; + } + catch (Exception e) { + X.error(e.getClass().getSimpleName() + ":" + e.getMessage()); + } + + long elapsedSecs = elapsedMs / 1000; + X.println(" >> " + + operation + + " done: " + totalCnt + "/" + elapsedSecs + "s, " + + "Cur. " + operation + " " + currPutPerSec + " recs/sec " + + "cpWriteSpeed=" + cpWriteSpeed + " " + + "cpSyncSpeed=" + cpSyncSpeed + " " + + "walSpeed= " + walSpeed + " " + + "walWorkSeg.="+walWorkSegments + " " + + "markDirtySpeed=" + markDirtySpeed +" " + + "Avg. " + operation + " " + averagePutPerSec + " recs/sec, " + + "dirtyP=" + dirtyPages + ", " + + "cpWrittenP.=" + cpWrittenPages + ", " + + "cpBufP.=" + cpBufPages + " " + + "threshold=" + targetDirtyRatioStr + " " + + "walIdx=" + idx + " " + + "archWalIdx=" + lastArchIdx + " " + + "walArchiveSegments=" + walArchiveSegments + " " + + fileNameWithDump); + + line(elapsedSecs, + currPutPerSec, + walSpeed, + cpWriteSpeed, + cpSyncSpeed, + walWorkSegments, + throttleParkTimeNanos, + formatDbl(curDirtyRatio), + targetDirtyRatioStr, + formatDbl(closeToThrottle), + markDirtySpeed, + cpWriteSpeedInPages, + estWrAllSpeed, + averagePutPerSec, + dirtyPages, + cpWrittenPages, + cpSyncedPages, + cpEvictedPages, + idx, + lastArchIdx, + walArchiveSegments + ); + } + + /** + * @param val value to log. + * @return formatted value for txt log. + */ + private String formatDbl(double val) { + return String.format("%.2f", val).replace(",", "."); + } + + /** + * Converts bytes counter to MegaBytes + * @param currBytesWritten bytes. + * @return string with megabytes as printable string. + */ + private String getMBytesPrintable(long currBytesWritten) { + double cpMbPs = 1.0 * currBytesWritten / (1024 * 1024); + + return formatDbl(cpMbPs); + } + + /** + * @param elapsedMsFromPrevTick time from previous tick, millis. + * @param absVal current value + * @param cnt counter stores previous value. + * @return value change from previous tick. + */ + private long detectDelta(long elapsedMsFromPrevTick, long absVal, AtomicLong cnt) { + long cpPagesChange = absVal - cnt.getAndSet(absVal); + + if (cpPagesChange < 0) + cpPagesChange = 0; + + return (cpPagesChange * 1000) / elapsedMsFromPrevTick; + } + + /** + * @return file name with dump created. + */ + private String reactNoProgress() { + try { + String threadDump = generateThreadDump(); + + long sec = TimeUnit.MILLISECONDS.toSeconds(U.currentTimeMillis() - msStart); + + String fileName = "dumpAt" + sec + "second.txt"; + + if (threadDump.contains(IgniteCacheDatabaseSharedManager.class.getName() + ".checkpointLock")) + fileName = "checkpoint_" + fileName; + + fileName = operation + fileName; + + try (FileWriter writer = new FileWriter(new File(getTempDirFile(), fileName))) { + writer.write(threadDump); + } + + return fileName; + } + catch (IOException | IgniteCheckedException e) { + e.printStackTrace(); + } + return ""; + } + + /** + * @param cnt counter of entries/operations processed since last call. + */ + public void reportProgress(int cnt) { + overallRecordsProcessed.add(cnt); + } + + /** + * Call this method to indicate client operation is done, and ignite is stopping. + */ + public void stopping() { + stopping = true; + } + + /** + * Stops watchdog threads. + */ + public void stop() { + U.closeQuiet(txtWriter); + + ScheduledExecutorService pool = this.svc; + stopPool(pool); + } + + public static void stopPool(ExecutorService pool) { + pool.shutdown(); + try { + pool.awaitTermination(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 13e811c..1b6fde3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -84,7 +84,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { } }, new DataRegionMetricsImpl(new DataRegionConfiguration()), - false + PageMemoryImpl.ThrottlingPolicy.NONE ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index ce5deba..1180164 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -84,7 +84,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest } }, new DataRegionMetricsImpl(new DataRegionConfiguration()), - false + PageMemoryImpl.ThrottlingPolicy.NONE ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java new file mode 100644 index 0000000..054696c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.logger.NullLogger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * + */ +public class IgniteThrottlingUnitTest { + /** Logger. */ + private IgniteLogger log = new NullLogger(); + + /** Page memory 2 g. */ + private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class); + + { + when(pageMemory2g.totalPages()).thenReturn((2L * 1024 * 1024 * 1024) / 4096); + } + + /** + * + */ + @Test + public void breakInCaseTooFast() { + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + + long time = throttle.getParkTime(0.67, + (362584 + 67064) / 2, + 328787, + 1, + 60184, + 23103); + + assertTrue(time > 0); + } + + /** + * + */ + @Test + public void noBreakIfNotFastWrite() { + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + + long time = throttle.getParkTime(0.47, + ((362584 + 67064) / 2), + 328787, + 1, + 20103, + 23103); + + assertTrue(time == 0); + } + + /** + * @throws InterruptedException if interrupted. + */ + @Test + public void averageCalculation() throws InterruptedException { + IntervalBasedMeasurement measurement = new IntervalBasedMeasurement(100, 1); + + for (int i = 0; i < 1000; i++) + measurement.addMeasurementForAverageCalculation(100); + + assertEquals(100, measurement.getAverage()); + + Thread.sleep(220); + + assertEquals(0, measurement.getAverage()); + + assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime())); + } + + /** + * @throws InterruptedException if interrupted. + */ + @Test + public void speedCalculation() throws InterruptedException { + IntervalBasedMeasurement measurement = new IntervalBasedMeasurement(100, 1); + + for (int i = 0; i < 1000; i++) + measurement.setCounter(i, System.nanoTime()); + + long speed = measurement.getSpeedOpsPerSec(System.nanoTime()); + System.out.println("speed measured " + speed); + assertTrue(speed > 1000); + + Thread.sleep(230); + + assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime())); + } + + /** + * @throws InterruptedException if interrupted. + */ + @Test + public void speedWithDelayCalculation() throws InterruptedException { + IntervalBasedMeasurement measurement = new IntervalBasedMeasurement(100, 1); + + int runs = 10; + int nanosPark = 100; + int multiplier = 100000; + for (int i = 0; i < runs; i++) { + measurement.setCounter(i * multiplier, System.nanoTime()); + + LockSupport.parkNanos(nanosPark); + } + + long speed = measurement.getSpeedOpsPerSec(System.nanoTime()); + + assertTrue(speed > 0); + long maxSpeed = (TimeUnit.SECONDS.toNanos(1) * multiplier * runs) / ((long)(runs * nanosPark)); + assertTrue(speed < maxSpeed); + + Thread.sleep(200); + + assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime())); + } + + /** + * + */ + @Test + public void beginOfCp() { + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + + assertTrue(throttle.getParkTime(0.01, 100,400000, + 1, + 20103, + 23103) == 0); + + //mark speed 22413 for mark all remaining as dirty + long time = throttle.getParkTime(0.024, 100, 400000, + 1, + 24000, + 23103); + assertTrue(time > 0); + + assertTrue(throttle.getParkTime(0.01, + 100, + 400000, + 1, + 22412, + 23103) == 0); + } + + /** + * + */ + @Test + public void enforceThrottleAtTheEndOfCp() { + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + + long time1 = throttle.getParkTime(0.70, 300000, 400000, + 1, 20200, 23000); + long time2 = throttle.getParkTime(0.71, 300000, 400000, + 1, 20200, 23000); + + assertTrue(time2 >= time1 * 2); // extra slowdown should be applied. + + long time3 = throttle.getParkTime(0.73, 300000, 400000, + 1, 20200, 23000); + long time4 = throttle.getParkTime(0.74, 300000, 400000, + 1, 20200, 23000); + + assertTrue(time3 > time2); + assertTrue(time4 > time3); + } + + /** + * + */ + @Test + public void tooMuchPagesMarkedDirty() { + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + + // 363308 350004 348976 10604 + long time = throttle.getParkTime(0.75, + ((350004 + 348976) / 2), + 350004-10604, + 4, + 279, + 23933); + + System.err.println(time); + + assertTrue(time == 0); + } + + /** + * + */ + @Test + public void warningInCaseTooMuchThrottling() { + AtomicInteger warnings = new AtomicInteger(0); + IgniteLogger log = mock(IgniteLogger.class); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + + System.out.println("log.info() called with arguments: " + Arrays.toString(args)); + + warnings.incrementAndGet(); + + return null; + }).when(log).info(anyString()); + + AtomicInteger written = new AtomicInteger(); + GridCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class); + when(db.checkpointLockIsHeldByThread()).thenReturn(true); + when(db.writtenPagesCounter()).thenReturn(written); + + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, db, log) { + @Override protected void doPark(long throttleParkTimeNs) { + //do nothing + } + }; + throttle.onBeginCheckpoint(); + written.set(200); //emulating some pages written + + for (int i = 0; i < 100000; i++) { + //emulating high load on marking + throttle.onMarkDirty(false); + + if (throttle.throttleWeight() > PagesWriteSpeedBasedThrottle.WARN_THRESHOLD) + break; + } + + for (int i = 0; i < 1000; i++) { + //emulating additional page writes to be sure log message is generated + + throttle.onMarkDirty(false); + + if(warnings.get()>0) + break; + } + + System.out.println(throttle.throttleWeight()); + + assertTrue(warnings.get() > 0); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java index effbe7e..04f3bd0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java @@ -99,7 +99,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest { } }, new DataRegionMetricsImpl(new DataRegionConfiguration()), - false + PageMemoryImpl.ThrottlingPolicy.NONE ); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index 3d806b1..ed285c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -90,7 +90,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { } }, new DataRegionMetricsImpl(new DataRegionConfiguration()), - false + PageMemoryImpl.ThrottlingPolicy.NONE ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 5c91c59..1369f28 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -122,7 +122,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { } }, new DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()), - false + PageMemoryImpl.ThrottlingPolicy.NONE ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index a7c549d..f957aec 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -19,7 +19,6 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistence; -import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicCacheTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest; http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java new file mode 100644 index 0000000..a2bfeb3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.testsuites; + +import org.apache.ignite.internal.processors.cache.persistence.pagemem.IgniteThrottlingUnitTest; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + IgniteThrottlingUnitTest.class +}) +public class IgnitePdsUnitTestSuite { +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b50aa5eb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java index 22bd610..0546459 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java @@ -30,12 +30,16 @@ import junit.framework.TestSuite; * This suite is not included into main build */ public class IgniteReproducingSuite extends TestSuite { + /** + * @return suite with test(s) for reproduction some problem. + * @throws Exception if failed. + */ public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Ignite Issue Reproducing Test Suite"); //uncomment to add some test //for (int i = 0; i < 100; i++) - // suite.addTestSuite(); + // suite.addTestSuite(IgniteCheckpointDirtyPagesForLowLoadTest.class); return suite; }