Repository: ignite Updated Branches: refs/heads/master 1649c532f -> f12055115
IGNITE-6334 Throttle writing threads during ongoing checkpoint - Fixes #2710. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1205511 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1205511 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1205511 Branch: refs/heads/master Commit: f120551159fd2eeed8cedc0ca3e3ddc394505737 Parents: 1649c53 Author: Ivan Rakov <ivan.glu...@gmail.com> Authored: Fri Sep 22 12:40:22 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Sep 22 12:40:22 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 5 + .../PersistentStoreConfiguration.java | 28 +- .../GridCacheDatabaseSharedManager.java | 48 ++- .../persistence/pagemem/PageMemoryImpl.java | 96 +++++- .../persistence/pagemem/PagesWriteThrottle.java | 104 ++++++ .../pagemem/BPlusTreePageMemoryImplTest.java | 4 +- .../BPlusTreeReuseListPageMemoryImplTest.java | 3 +- .../MetadataStoragePageMemoryImplTest.java | 4 +- .../pagemem/PageMemoryImplNoLoadTest.java | 4 +- .../persistence/pagemem/PageMemoryImplTest.java | 4 +- .../pagemem/PagesWriteThrottleSandboxTest.java | 264 +++++++++++++++ .../pagemem/PagesWriteThrottleSmokeTest.java | 322 +++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite.java | 4 + 13 files changed, 866 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 5f1839b..39e65e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Properties; import javax.net.ssl.HostnameVerifier; import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -726,6 +727,10 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_WAL_LOG_TX_RECORDS = "IGNITE_WAL_LOG_TX_RECORDS"; + /** If this property is set, {@link PersistentStoreConfiguration#writeThrottlingEnabled} will be overridden to true + * independent of initial value in configuration. */ + public static final String IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED = "IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index abca5a5..c44e92d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -84,6 +84,9 @@ public class PersistentStoreConfiguration implements Serializable { /** Default wal archive directory. */ public static final String DFLT_WAL_ARCHIVE_PATH = "db/wal/archive"; + /** Default write throttling enabled. */ + public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false; + /** */ private String persistenceStorePath; @@ -162,6 +165,11 @@ public class PersistentStoreConfiguration implements Serializable { private long walAutoArchiveAfterInactivity = -1; /** + * If true, threads that generate dirty pages too fast during ongoing checkpoint will be throttled. + */ + private boolean writeThrottlingEnabled = DFLT_WRITE_THROTTLING_ENABLED; + + /** * Returns a path the root directory where the Persistent Store will persist data and indexes. */ public String getPersistentStorePath() { @@ -240,7 +248,7 @@ public class PersistentStoreConfiguration implements Serializable { /** * Sets a number of threads to use for the checkpointing purposes. * - * @param checkpointingThreads Number of checkpointing threads. One thread is used by default. + * @param checkpointingThreads Number of checkpointing threads. Four threads are used by default. * @return {@code this} for chaining. */ public PersistentStoreConfiguration setCheckpointingThreads(int checkpointingThreads) { @@ -402,6 +410,24 @@ public class PersistentStoreConfiguration implements Serializable { } /** + * Gets flag indicating whether write throttling is enabled. + */ + public boolean isWriteThrottlingEnabled() { + return writeThrottlingEnabled; + } + + /** + * Sets flag indicating whether write throttling is enabled. + * + * @param writeThrottlingEnabled Write throttling enabled flag. + */ + public PersistentStoreConfiguration setWriteThrottlingEnabled(boolean writeThrottlingEnabled) { + this.writeThrottlingEnabled = writeThrottlingEnabled; + + return this; + } + + /** * Gets the length of the time interval for rate-based metrics. This interval defines a window over which * hits will be tracked. Default value is {@link #DFLT_RATE_TIME_INTERVAL_MILLIS}. * http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 5f03d8f..62210dc 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -301,6 +301,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private ObjectName persistenceMetricsMbeanName; + /** Counter for written checkpoint pages. Not null only if checkpoint is running. */ + private volatile AtomicInteger writtenPagesCntr = null; + + /** Number of pages in current checkpoint. */ + private volatile int currCheckpointPagesCnt; + /** * @param ctx Kernal context. */ @@ -667,6 +673,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan "Checkpoint page buffer size is too big, setting to an adjusted cache size [size=" + U.readableSize(cacheSize, false) + ", memPlc=" + plcCfg.getName() + ']'); + boolean writeThrottlingEnabled = persistenceCfg.isWriteThrottlingEnabled(); + + if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, false)) + writeThrottlingEnabled = true; + PageMemoryImpl pageMem = new PageMemoryImpl( memProvider, calculateFragmentSizes( @@ -699,7 +710,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } }, this, - memMetrics + memMetrics, + writeThrottlingEnabled ); memMetrics.pageMemory(pageMem); @@ -942,7 +954,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquiSnapshotWorkerre memory + * Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquireSnapshotWorker memory * state. */ @SuppressWarnings("LockAcquiredButNotSafelyReleased") @@ -1906,6 +1918,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Counter for written checkpoint pages. Not null only if checkpoint is running. + */ + public AtomicInteger writtenPagesCounter() { + return writtenPagesCntr; + } + + /** + * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0. + */ + public int currentCheckpointPagesCount() { + return currCheckpointPagesCnt; + } + + /** * @param cpTs Checkpoint timestamp. * @param cpId Checkpoint ID. * @param type Checkpoint type. @@ -2038,6 +2064,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Checkpoint chp = markCheckpointBegin(tracker); + currCheckpointPagesCnt = chp.pagesSize; + + writtenPagesCntr = new AtomicInteger(); + boolean interrupted = true; try { @@ -2049,7 +2079,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan asyncRunner == null ? 1 : chp.cpPages.collectionsSize()); tracker.onPagesWriteStart(); - final AtomicInteger writtenPagesCtr = new AtomicInteger(); + final int totalPagesToWriteCnt = chp.cpPages.size(); if (asyncRunner != null) { @@ -2059,7 +2089,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan chp.cpPages.innerCollection(i), updStores, doneWriteFut, - writtenPagesCtr, totalPagesToWriteCnt ); @@ -2078,7 +2107,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan chp.cpPages, updStores, doneWriteFut, - writtenPagesCtr, totalPagesToWriteCnt); write.run(); @@ -2402,6 +2430,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan chp.cpEntry.checkpointMark(), null, CheckpointEntryType.END); + + writtenPagesCntr = null; + + currCheckpointPagesCnt = 0; } checkpointHist.onCheckpointFinished(chp); @@ -2498,9 +2530,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private CountDownFuture doneFut; - /** Counter for all written pages. May be shared between several workers */ - private AtomicInteger writtenPagesCntr; - /** Total pages to write, counter may be greater than {@link #writePageIds} size */ private final int totalPagesToWrite; @@ -2511,7 +2540,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param writePageIds Collection of page IDs to write. * @param updStores * @param doneFut - * @param writtenPagesCntr all written pages counter, may be shared between several write tasks * @param totalPagesToWrite total pages to be written under this checkpoint */ private WriteCheckpointPages( @@ -2519,13 +2547,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final Collection<FullPageId> writePageIds, final GridConcurrentHashSet<PageStore> updStores, final CountDownFuture doneFut, - @NotNull final AtomicInteger writtenPagesCntr, final int totalPagesToWrite) { this.tracker = tracker; this.writePageIds = writePageIds; this.updStores = updStores; this.doneFut = doneFut; - this.writtenPagesCntr = writtenPagesCntr; this.totalPagesToWrite = totalPagesToWrite; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index dbb64f8..1da17b5 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -56,6 +57,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO; @@ -179,6 +181,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** State checker. */ private final CheckpointLockStateChecker stateChecker; + /** Number of used pages in checkpoint buffer. */ + private final AtomicInteger cpBufPagesCntr = new AtomicInteger(0); + /** */ private ExecutorService asyncRunner = new ThreadPoolExecutor( 0, @@ -217,6 +222,12 @@ public class PageMemoryImpl implements PageMemoryEx { /** Flush dirty page closure. When possible, will be called by evictPage(). */ private final GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker; + /** Pages write throttle. */ + private PagesWriteThrottle writeThrottle; + + /** Write throttle enabled flag. */ + private boolean throttleEnabled; + /** */ private boolean pageEvictWarned; @@ -232,6 +243,7 @@ public class PageMemoryImpl implements PageMemoryEx { * @param pageSize Page size. * @param flushDirtyPage Callback invoked when a dirty page is evicted. * @param changeTracker Callback invoked to track changes in pages. + * @param throttleEnabled Write throttle enabled flag. */ public PageMemoryImpl( DirectMemoryProvider directMemoryProvider, @@ -241,7 +253,8 @@ public class PageMemoryImpl implements PageMemoryEx { GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage, GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker, CheckpointLockStateChecker stateChecker, - MemoryMetricsImpl memMetrics + MemoryMetricsImpl memMetrics, + boolean throttleEnabled ) { assert sharedCtx != null; @@ -253,6 +266,7 @@ public class PageMemoryImpl implements PageMemoryEx { this.flushDirtyPage = flushDirtyPage; this.changeTracker = changeTracker; this.stateChecker = stateChecker; + this.throttleEnabled = throttleEnabled; storeMgr = sharedCtx.pageStore(); walMgr = sharedCtx.wal(); @@ -290,7 +304,7 @@ public class PageMemoryImpl implements PageMemoryEx { DirectMemoryRegion cpReg = regions.get(regs - 1); - checkpointPool = new PagePool(regs - 1, cpReg); + checkpointPool = new PagePool(regs - 1, cpReg, cpBufPagesCntr); long checkpointBuf = cpReg.size(); @@ -305,12 +319,14 @@ public class PageMemoryImpl implements PageMemoryEx { totalAllocated += reg.size(); - segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length); + segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length, throttleEnabled); pages += segments[i].pages(); totalTblSize += segments[i].tableSize(); } + initWriteThrottle(); + if (log.isInfoEnabled()) log.info("Started page memory [memoryAllocated=" + U.readableSize(totalAllocated, false) + ", pages=" + pages + @@ -319,6 +335,21 @@ public class PageMemoryImpl implements PageMemoryEx { ']'); } + /** + * + */ + private void initWriteThrottle() { + if (!(sharedCtx.database() instanceof GridCacheDatabaseSharedManager)) { + log.error("Write throttle can't start. Unexpected class of database manager: " + + sharedCtx.database().getClass()); + + throttleEnabled = false; + } + + if (throttleEnabled) + writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)sharedCtx.database()); + } + /** {@inheritDoc} */ @SuppressWarnings("OverlyStrongTypeCast") @Override public void stop() throws IgniteException { @@ -774,6 +805,18 @@ public class PageMemoryImpl implements PageMemoryEx { return true; } + /** + * @param dirtyRatioThreshold Throttle threshold. + */ + boolean shouldThrottle(double dirtyRatioThreshold) { + for (Segment segment : segments) { + if (segment.shouldThrottle(dirtyRatioThreshold)) + return true; + } + + return false; + } + /** {@inheritDoc} */ @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() throws IgniteException { Collection[] collections = new Collection[segments.length]; @@ -799,6 +842,9 @@ public class PageMemoryImpl implements PageMemoryEx { @Override public void finishCheckpoint() { for (Segment seg : segments) seg.segCheckpointPages = null; + + if (throttleEnabled) + writeThrottle.onFinishCheckpoint(); } /** {@inheritDoc} */ @@ -1219,6 +1265,9 @@ public class PageMemoryImpl implements PageMemoryEx { try { rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); + + if (throttleEnabled && !restore && markDirty && !dirty) + writeThrottle.onMarkDirty(isInCheckpoint(fullId)); } catch (AssertionError ex) { StringBuilder sb = new StringBuilder(sysPageSize * 2); @@ -1310,6 +1359,20 @@ public class PageMemoryImpl implements PageMemoryEx { } /** + * Number of used pages in checkpoint buffer. + */ + public int checkpointBufferPagesCount() { + return cpBufPagesCntr.get(); + } + + /** + * Number of used pages in checkpoint buffer. + */ + public int checkpointBufferPagesSize() { + return checkpointPool.pages(); + } + + /** * This method must be called in synchronized context. * * @param absPtr Absolute pointer. @@ -1385,6 +1448,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** Direct memory region. */ protected final DirectMemoryRegion region; + /** Pool pages counter. */ + protected final AtomicInteger pagesCntr; + /** */ protected long lastAllocatedIdxPtr; @@ -1397,10 +1463,12 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param idx Index. * @param region Region + * @param pagesCntr Pages counter. */ - protected PagePool(int idx, DirectMemoryRegion region) { + protected PagePool(int idx, DirectMemoryRegion region, AtomicInteger pagesCntr) { this.idx = idx; this.region = region; + this.pagesCntr = pagesCntr; long base = (region.address() + 7) & ~0x7; @@ -1427,6 +1495,9 @@ public class PageMemoryImpl implements PageMemoryEx { * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page. */ private long borrowOrAllocateFreePage(long pageId) throws GridOffHeapOutOfMemoryException { + if (pagesCntr != null) + pagesCntr.getAndIncrement(); + long relPtr = borrowFreePage(); return relPtr != INVALID_REL_PTR ? relPtr : allocateFreePage(pageId); @@ -1500,6 +1571,9 @@ public class PageMemoryImpl implements PageMemoryEx { assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr); + if (pagesCntr != null) + pagesCntr.getAndDecrement(); + while (true) { long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr); @@ -1580,8 +1654,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param region Memory region. + * @param throttlingEnabled Write throttling enabled flag. */ - private Segment(int idx, DirectMemoryRegion region, int cpPoolPages) { + private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, boolean throttlingEnabled) { long totalMemory = region.size(); int pages = (int)(totalMemory / sysPageSize); @@ -1596,9 +1671,9 @@ public class PageMemoryImpl implements PageMemoryEx { DirectMemoryRegion poolRegion = region.slice(memPerTbl + 8); - pool = new PagePool(idx, poolRegion); + pool = new PagePool(idx, poolRegion, null); - maxDirtyPages = Math.min(pool.pages() * 2 / 3, cpPoolPages); + maxDirtyPages = throttlingEnabled ? pool.pages() * 3 / 4 : Math.min(pool.pages() * 2 / 3, cpPoolPages); } /** @@ -1609,6 +1684,13 @@ public class PageMemoryImpl implements PageMemoryEx { } /** + * @param dirtyRatioThreshold Throttle threshold. + */ + private boolean shouldThrottle(double dirtyRatioThreshold) { + return ((double)dirtyPages.size()) / pages() > dirtyRatioThreshold; + } + + /** * @return Max number of pages this segment can allocate. */ private int pages() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java new file mode 100644 index 0000000..d0c67c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java @@ -0,0 +1,104 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; + +/** + * Throttles threads that generate dirty pages during ongoing checkpoint. + * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed. + */ +public class PagesWriteThrottle { + /** Page memory. */ + private final PageMemoryImpl pageMemory; + + /** Database manager. */ + private final GridCacheDatabaseSharedManager dbSharedMgr; + + /** Starting throttle time. Limits write speed to 1000 MB/s. */ + private static final long STARTING_THROTTLE_NANOS = 4000; + + /** Backoff ratio. Each next park will be this times longer. */ + private static final double BACKOFF_RATIO = 1.05; + + /** Exponential backoff counter. */ + private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0); + /** + * @param pageMemory Page memory. + * @param dbSharedMgr Database manager. + */ + public PagesWriteThrottle(PageMemoryImpl pageMemory, GridCacheDatabaseSharedManager dbSharedMgr) { + this.pageMemory = pageMemory; + this.dbSharedMgr = dbSharedMgr; + } + + /** + * + */ + public void onMarkDirty(boolean isInCheckpoint) { + assert dbSharedMgr.checkpointLockIsHeldByThread(); + + AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); + + if (writtenPagesCntr == null) + return; // Don't throttle if checkpoint is not running. + + boolean shouldThrottle = false; + + if (isInCheckpoint) { + int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3; + + shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit; + } + + if (!shouldThrottle) { + int cpWrittenPages = writtenPagesCntr.get(); + + int cpTotalPages = dbSharedMgr.currentCheckpointPagesCount(); + + if (cpWrittenPages == cpTotalPages) { + // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 + shouldThrottle = pageMemory.shouldThrottle(3.0 / 4); + } else { + double dirtyRatioThreshold = ((double)cpWrittenPages) / cpTotalPages; + + // Starting with 0.05 to avoid throttle right after checkpoint start + // 7/12 is maximum ratio of dirty pages + dirtyRatioThreshold = (dirtyRatioThreshold * 0.95 + 0.05) * 7 / 12; + + shouldThrottle = pageMemory.shouldThrottle(dirtyRatioThreshold); + } + } + + if (shouldThrottle) { + int throttleLevel = exponentialBackoffCntr.getAndIncrement(); + + LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel))); + } + else + exponentialBackoffCntr.set(0); + } + + /** + * + */ + public void onFinishCheckpoint() { + exponentialBackoffCntr.set(0); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 6f58782..56d09f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -82,7 +82,9 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index b263d4f..39183b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -82,7 +82,8 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration()) + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java index d9257bd..a427c63 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java @@ -97,6 +97,8 @@ public class MetadataStoragePageMemoryImplTest extends MetadataStorageSelfTest{ return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index 1fff1f0..467ede4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -88,7 +88,9 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 0366eca..c5997fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -110,7 +110,9 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { return true; } }, - new MemoryMetricsImpl(new MemoryPolicyConfiguration())); + new MemoryMetricsImpl(new MemoryPolicyConfiguration()), + false + ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java new file mode 100644 index 0000000..409ab84 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSandboxTest.java @@ -0,0 +1,264 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.io.Serializable; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.MemoryMetrics; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test to visualize and debug {@link PagesWriteThrottle}. + * Prints puts/gets rate, number of dirty pages, pages written in current checkpoint and pages in checkpoint buffer. + * Not intended to be part of any test suite. + */ +public class PagesWriteThrottleSandboxTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + discoverySpi.setIpFinder(ipFinder); + + MemoryConfiguration dbCfg = new MemoryConfiguration(); + + dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration() + .setMaxSize(4000L * 1024 * 1024) + .setName("dfltMemPlc") + .setMetricsEnabled(true)); + + dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + + cfg.setMemoryConfiguration(dbCfg); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setName(CACHE_NAME); + ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64)); + + cfg.setCacheConfiguration(ccfg1); + + cfg.setPersistentStoreConfiguration( + new PersistentStoreConfiguration() + .setWalMode(WALMode.BACKGROUND) + .setCheckpointingFrequency(20_000) + .setCheckpointingPageBufferSize(1000L * 1000 * 1000) + .setWriteThrottlingEnabled(true)); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 100 * 60 * 1000; + } + + /** + * @throws Exception if failed. + */ + public void testThrottle() throws Exception { + startGrids(1).active(true); + + try { + final Ignite ig = ignite(0); + + final int keyCnt = 4_000_000; + + final AtomicBoolean run = new AtomicBoolean(true); + + final HitRateMetrics getRate = new HitRateMetrics(5000, 5); + + GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (run.get()) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int key = rnd.nextInt(keyCnt * 2); + + ignite(0).cache(CACHE_NAME).get(key); + + getRate.onHit(); + } + + return null; + } + }, 2, "read-loader"); + + final HitRateMetrics putRate = new HitRateMetrics(1000, 5); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + while (run.get()) { + long dirtyPages = 0; + + for (MemoryMetrics m : ig.memoryMetrics()) + if (m.getName().equals("dfltMemPlc")) + dirtyPages = m.getDirtyPages(); + + long cpBufPages = 0; + + long cpWrittenPages; + + AtomicInteger cntr = ((GridCacheDatabaseSharedManager)(((IgniteEx)ignite(0)) + .context().cache().context().database())).writtenPagesCounter(); + + cpWrittenPages = cntr == null ? 0 : cntr.get(); + + try { + cpBufPages = ((PageMemoryImpl)((IgniteEx)ignite(0)).context().cache().context().database() + .memoryPolicy("dfltMemPlc").pageMemory()).checkpointBufferPagesCount(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + System.out.println("@@@ putsPerSec=," + (putRate.getRate()) + ", getsPerSec=," + (getRate.getRate()) + ", dirtyPages=," + dirtyPages + ", cpWrittenPages=," + cpWrittenPages +", cpBufPages=," + cpBufPages); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }, "metrics-view"); + + try (IgniteDataStreamer<Object, Object> ds = ig.dataStreamer(CACHE_NAME)) { + ds.allowOverwrite(true); + + for (int i = 0; i < keyCnt * 10; i++) { + ds.addData(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(), + ThreadLocalRandom.current().nextInt())); + + putRate.onHit(); + } + } + + run.set(false); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private final int v1; + + /** */ + private final int v2; + + /** */ + private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)]; + + /** + * @param v1 Value 1. + * @param v2 Value 2. + */ + private TestValue(int v1, int v2) { + this.v1 = v1; + this.v2 = v2; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue val = (TestValue)o; + + return v1 == val.v1 && v2 == val.v2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = v1; + + res = 31 * res + v2; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java new file mode 100644 index 0000000..12a601d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java @@ -0,0 +1,322 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Slow checkpoint enabled. */ + private final AtomicBoolean slowCheckpointEnabled = new AtomicBoolean(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + discoverySpi.setIpFinder(ipFinder); + + MemoryConfiguration dbCfg = new MemoryConfiguration(); + + dbCfg.setMemoryPolicies(new MemoryPolicyConfiguration() + .setMaxSize(400 * 1024 * 1024) + .setName("dfltMemPlc") + .setMetricsEnabled(true)); + + dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); + + cfg.setMemoryConfiguration(dbCfg); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setName(CACHE_NAME); + ccfg1.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); + ccfg1.setAffinity(new RendezvousAffinityFunction(false, 64)); + + cfg.setCacheConfiguration(ccfg1); + + cfg.setPersistentStoreConfiguration( + new PersistentStoreConfiguration() + .setWalMode(WALMode.BACKGROUND) + .setCheckpointingFrequency(20_000) + .setCheckpointingPageBufferSize(200 * 1000 * 1000) + .setWriteThrottlingEnabled(true) + .setCheckpointingThreads(1) + .setFileIOFactory(new SlowCheckpointFileIOFactory())); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteWorkFiles(); + + slowCheckpointEnabled.set(true); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 6 * 60 * 1000; + } + + /** + * @throws Exception if failed. + */ + public void testThrottle() throws Exception { + startGrids(2).active(true); + + try { + Ignite ig = ignite(0); + + final int keyCnt = 2_000_000; + + final AtomicBoolean run = new AtomicBoolean(true); + + final AtomicBoolean zeroDropdown = new AtomicBoolean(false); + + final HitRateMetrics putRate10secs = new HitRateMetrics(10_000, 20); + + final HitRateMetrics putRate1sec = new HitRateMetrics(1_000, 20); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + Thread.sleep(5000); + + while (run.get()) { + System.out.println( + "Put rate over last 10 seconds: " + (putRate10secs.getRate() / 10) + + " puts/sec, over last 1 second: " + putRate1sec.getRate()); + + if (putRate10secs.getRate() == 0) { + zeroDropdown.set(true); + + run.set(false); + } + + Thread.sleep(1000); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + run.set(false); + } + } + }, "rate-checker"); + + final IgniteCache<Integer, TestValue> cache = ig.getOrCreateCache(CACHE_NAME); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + long startTs = System.currentTimeMillis(); + + for (int i = 0; i < keyCnt * 10 && System.currentTimeMillis() - startTs < 3 * 60 * 1000; i++) { + if (!run.get()) + break; + + cache.put(ThreadLocalRandom.current().nextInt(keyCnt), new TestValue(ThreadLocalRandom.current().nextInt(), + ThreadLocalRandom.current().nextInt())); + + putRate10secs.onHit(); + + putRate1sec.onHit(); + } + + run.set(false); + } + }, "loader"); + + while (run.get()) + LockSupport.parkNanos(10_000); + + if (zeroDropdown.get()) { + slowCheckpointEnabled.set(false); + + IgniteInternalFuture cpFut1 = ((IgniteEx)ignite(0)).context().cache().context().database() + .wakeupForCheckpoint("test"); + + IgniteInternalFuture cpFut2 = ((IgniteEx)ignite(1)).context().cache().context().database() + .wakeupForCheckpoint("test"); + + cpFut1.get(); + + cpFut2.get(); + + fail("Put rate degraded to zero for at least 10 seconds"); + } + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private final int v1; + + /** */ + private final int v2; + + /** */ + private byte[] payload = new byte[400 + ThreadLocalRandom.current().nextInt(20)]; + + /** + * @param v1 Value 1. + * @param v2 Value 2. + */ + private TestValue(int v1, int v2) { + this.v1 = v1; + this.v2 = v2; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue val = (TestValue)o; + + return v1 == val.v1 && v2 == val.v2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = v1; + + res = 31 * res + v2; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "snapshot", false)); + } + + /** + * Create File I/O that emulates poor checkpoint write speed. + */ + private class SlowCheckpointFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, "rw"); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, String mode) throws IOException { + final FileIO delegate = delegateFactory.create(file, mode); + + return new FileIODecorator(delegate) { + @Override public int write(ByteBuffer srcBuf) throws IOException { + if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint")) + LockSupport.parkNanos(5_000_000); + + return delegate.write(srcBuf); + } + + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint")) + LockSupport.parkNanos(5_000_000); + + return delegate.write(srcBuf, position); + } + + @Override public void write(byte[] buf, int off, int len) throws IOException { + if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint")) + LockSupport.parkNanos(5_000_000); + + delegate.write(buf, off, len); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f1205511/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index b2a1f65..ef7682f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTree import org.apache.ignite.internal.processors.cache.persistence.pagemem.MetadataStoragePageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplNoLoadTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest; import org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest; import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest; import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest; @@ -80,6 +81,9 @@ public class IgnitePdsTestSuite extends TestSuite { suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class); + // Write throttling + suite.addTestSuite(PagesWriteThrottleSmokeTest.class); + return suite; } }