IGNITE-8955 Checkpoint can't get write lock if massive eviction on node start started
Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a0fa79a8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a0fa79a8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a0fa79a8 Branch: refs/heads/ignite-8446 Commit: a0fa79a8d297654973ec8bb890485e8e8818594c Parents: 6ad291d Author: Eduard Shangareev <[email protected]> Authored: Wed Jul 11 19:43:19 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Wed Jul 11 19:43:19 2018 +0300 ---------------------------------------------------------------------- .../IgniteAuthenticationProcessor.java | 7 +- .../dht/preloader/GridDhtPartitionDemander.java | 107 +++--- .../GridCacheDatabaseSharedManager.java | 169 +++++---- .../persistence/GridCacheOffheapManager.java | 1 + .../persistence/pagemem/PageMemoryImpl.java | 18 +- .../pagemem/PagesWriteSpeedBasedThrottle.java | 16 +- .../persistence/pagemem/PagesWriteThrottle.java | 19 +- .../pagemem/PagesWriteThrottlePolicy.java | 5 + .../db/CheckpointBufferDeadlockTest.java | 358 +++++++++++++++++++ 9 files changed, 577 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java index ac713c3..ded37e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java @@ -1292,9 +1292,10 @@ public class IgniteAuthenticationProcessor extends GridProcessorAdapter implemen // Remove failed operation from active operations. activeOps.remove(op.id()); } - - if (sharedCtx != null) - sharedCtx.database().checkpointReadUnlock(); + finally { + if (sharedCtx != null) + sharedCtx.database().checkpointReadUnlock(); + } curOpFinishMsg = msg0; http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 3cfc25f..1eeebae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -727,77 +727,86 @@ public class GridDhtPartitionDemander { try { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); - ctx.database().checkpointReadLock(); + // Preload. + for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { + int p = e.getKey(); - try { - // Preload. - for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { - int p = e.getKey(); + if (aff.get(p).contains(ctx.localNode())) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); - if (aff.get(p).contains(ctx.localNode())) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true); + assert part != null; - assert part != null; + boolean last = supply.last().containsKey(p); - boolean last = supply.last().containsKey(p); + if (part.state() == MOVING) { + boolean reserved = part.reserve(); - if (part.state() == MOVING) { - boolean reserved = part.reserve(); + assert reserved : "Failed to reserve partition [igniteInstanceName=" + + ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']'; - assert reserved : "Failed to reserve partition [igniteInstanceName=" + - ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']'; + part.lock(); - part.lock(); + try { + Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator(); - try { - // Loop through all received entries and try to preload them. - for (GridCacheEntryInfo entry : e.getValue().infos()) { - if (!preloadEntry(node, p, entry, topVer)) { - if (log.isDebugEnabled()) - log.debug("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - - break; - } + // Loop through all received entries and try to preload them. + while (infos.hasNext()) { + ctx.database().checkpointReadLock(); - for (GridCacheContext cctx : grp.caches()) { - if (cctx.statisticsEnabled()) - cctx.cache().metrics0().onRebalanceKeyReceived(); - } - } + try { + for (int i = 0; i < 100; i++) { + if (!infos.hasNext()) + break; + + GridCacheEntryInfo entry = infos.next(); - // If message was last for this partition, - // then we take ownership. - if (last) { - fut.partitionDone(nodeId, p, true); + if (!preloadEntry(node, p, entry, topVer)) { + if (log.isDebugEnabled()) + log.debug("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - if (log.isDebugEnabled()) - log.debug("Finished rebalancing partition: " + part); + break; + } + + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) + cctx.cache().metrics0().onRebalanceKeyReceived(); + } + } + } + finally { + ctx.database().checkpointReadUnlock(); } } - finally { - part.unlock(); - part.release(); + + // If message was last for this partition, + // then we take ownership. + if (last) { + fut.partitionDone(nodeId, p, true); + + if (log.isDebugEnabled()) + log.debug("Finished rebalancing partition: " + part); } } - else { - if (last) - fut.partitionDone(nodeId, p, false); - - if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (state is not MOVING): " + part); + finally { + part.unlock(); + part.release(); } } else { - fut.partitionDone(nodeId, p, false); + if (last) + fut.partitionDone(nodeId, p, false); if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); + log.debug("Skipping rebalancing partition (state is not MOVING): " + part); } } - } - finally { - ctx.database().checkpointReadUnlock(); + else { + fut.partitionDone(nodeId, p, false); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); + } } // Only request partitions based on latest topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 6389942..2a0ba44 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1478,7 +1478,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping.")); } - if (safeToUpdatePageMemories() || checkpointLock.getReadHoldCount() > 1) + if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories()) break; else { checkpointLock.readLock().unlock(); @@ -2542,6 +2542,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, null); + assert tag == null || tag != PageMemoryImpl.TRY_AGAIN_TAG : + "Lock is held by other thread for page " + fullId; + if (tag != null) { tmpWriteBuf.rewind(); @@ -3103,7 +3106,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan chp.cpPages.innerCollection(i), updStores, doneWriteFut, - totalPagesToWriteCnt + totalPagesToWriteCnt, + asyncRunner ); try { @@ -3117,11 +3121,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } else { // Single-threaded checkpoint. - Runnable write = new WriteCheckpointPages(tracker, + Runnable write = new WriteCheckpointPages( + tracker, chp.cpPages, updStores, doneWriteFut, - totalPagesToWriteCnt); + totalPagesToWriteCnt, + null); write.run(); } @@ -3752,117 +3758,158 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Pages write task */ private class WriteCheckpointPages implements Runnable { /** */ - private CheckpointMetricsTracker tracker; + private final CheckpointMetricsTracker tracker; /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection */ - private Collection<FullPageId> writePageIds; + private final Collection<FullPageId> writePageIds; /** */ - private ConcurrentLinkedHashMap<PageStore, LongAdder> updStores; + private final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores; /** */ - private CountDownFuture doneFut; + private final CountDownFuture doneFut; /** Total pages to write, counter may be greater than {@link #writePageIds} size */ private final int totalPagesToWrite; + /** If any pages were skipped, new task with remaining pages will be submitted here. */ + private final ExecutorService retryWriteExecutor; + /** - * Creates task for write pages - * - * @param tracker - * @param writePageIds Collection of page IDs to write. - * @param updStores - * @param doneFut - * @param totalPagesToWrite total pages to be written under this checkpoint + * @param tracker Tracker. + * @param writePageIds Write page ids. + * @param updStores Upd stores. + * @param doneFut Done future. + * @param totalPagesToWrite Total pages to write. + * @param retryWriteExecutor Retry write executor. */ private WriteCheckpointPages( final CheckpointMetricsTracker tracker, final Collection<FullPageId> writePageIds, final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores, final CountDownFuture doneFut, - final int totalPagesToWrite) { + final int totalPagesToWrite, + final ExecutorService retryWriteExecutor + ) { this.tracker = tracker; this.writePageIds = writePageIds; this.updStores = updStores; this.doneFut = doneFut; this.totalPagesToWrite = totalPagesToWrite; + this.retryWriteExecutor = retryWriteExecutor; } /** {@inheritDoc} */ @Override public void run() { + snapshotMgr.beforeCheckpointPageWritten(); + + Collection<FullPageId> writePageIds = this.writePageIds; + + try { + List<FullPageId> pagesToRetry = writePages(writePageIds); + + if (pagesToRetry.isEmpty()) + doneFut.onDone((Void)null); + else { + if (retryWriteExecutor == null) { + while (!pagesToRetry.isEmpty()) + pagesToRetry = writePages(pagesToRetry); + + doneFut.onDone((Void)null); + } + else { + // Submit current retry pages to the end of the queue to avoid starvation. + WriteCheckpointPages retryWritesTask = new WriteCheckpointPages( + tracker, pagesToRetry, updStores, doneFut, totalPagesToWrite, retryWriteExecutor); + + retryWriteExecutor.submit(retryWritesTask); + } + } + } + catch (Throwable e) { + doneFut.onDone(e); + } + } + + /** + * @param writePageIds Collections of pages to write. + * @return pagesToRetry Pages which should be retried. + */ + private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException { ByteBuffer tmpWriteBuf = threadBuf.get(); long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf); - snapshotMgr.beforeCheckpointPageWritten(); + List<FullPageId> pagesToRetry = new ArrayList<>(); - try { - for (FullPageId fullId : writePageIds) { - if (checkpointer.shutdownNow) - break; - - tmpWriteBuf.rewind(); + for (FullPageId fullId : writePageIds) { + if (checkpointer.shutdownNow) + break; - snapshotMgr.beforePageWrite(fullId); + tmpWriteBuf.rewind(); - int grpId = fullId.groupId(); + snapshotMgr.beforePageWrite(fullId); - PageMemoryEx pageMem; + int grpId = fullId.groupId(); - if (grpId != MetaStorage.METASTORAGE_CACHE_ID) { - CacheGroupContext grp = context().cache().cacheGroup(grpId); + PageMemoryEx pageMem; - if (grp == null) - continue; + if (grpId != MetaStorage.METASTORAGE_CACHE_ID) { + CacheGroupContext grp = context().cache().cacheGroup(grpId); - if (!grp.dataRegion().config().isPersistenceEnabled()) - continue; + if (grp == null) + continue; - pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); - } - else - pageMem = (PageMemoryEx)metaStorage.pageMemory(); + if (!grp.dataRegion().config().isPersistenceEnabled()) + continue; + pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); + } + else + pageMem = (PageMemoryEx)metaStorage.pageMemory(); - Integer tag = pageMem.getForCheckpoint( - fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null); - if (tag != null) { - assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); - assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId()); + Integer tag = pageMem.getForCheckpoint( + fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null); - tmpWriteBuf.rewind(); + if (tag != null) { + if (tag == PageMemoryImpl.TRY_AGAIN_TAG) { + pagesToRetry.add(fullId); - if (persStoreMetrics.metricsEnabled()) { - int pageType = PageIO.getType(tmpWriteBuf); + continue; + } - if (PageIO.isDataPageType(pageType)) - tracker.onDataPageWritten(); - } + assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); + assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId()); - if (!skipCrc) { - PageIO.setCrc(writeAddr, PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize())); + tmpWriteBuf.rewind(); - tmpWriteBuf.rewind(); - } + if (persStoreMetrics.metricsEnabled()) { + int pageType = PageIO.getType(tmpWriteBuf); - int curWrittenPages = writtenPagesCntr.incrementAndGet(); + if (PageIO.isDataPageType(pageType)) + tracker.onDataPageWritten(); + } - snapshotMgr.onPageWrite(fullId, tmpWriteBuf, curWrittenPages, totalPagesToWrite); + if (!skipCrc) { + PageIO.setCrc(writeAddr, PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize())); tmpWriteBuf.rewind(); + } - PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false); + int curWrittenPages = writtenPagesCntr.incrementAndGet(); - updStores.computeIfAbsent(store, k -> new LongAdder()).increment(); - } - } + snapshotMgr.onPageWrite(fullId, tmpWriteBuf, curWrittenPages, totalPagesToWrite); - doneFut.onDone((Void)null); - } - catch (Throwable e) { - doneFut.onDone(e); + tmpWriteBuf.rewind(); + + PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false); + + updStores.computeIfAbsent(store, k -> new LongAdder()).increment(); + } } + + return pagesToRetry; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 9861ef9..ea775dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1207,6 +1207,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); dbMgr.checkpointReadLock(); + try { Metas metas = getOrAllocatePartitionMetas(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index a518121..f8f3b57 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -177,6 +177,9 @@ public class PageMemoryImpl implements PageMemoryEx { /** Number of random pages that will be picked for eviction. */ public static final int RANDOM_PAGES_EVICT_NUM = 5; + /** Try again tag. */ + public static final int TRY_AGAIN_TAG = -1; + /** Tracking io. */ private static final TrackingPageIO trackingIO = TrackingPageIO.VERSIONS.latest(); @@ -377,9 +380,9 @@ public class PageMemoryImpl implements PageMemoryEx { if (throttlingPlc == ThrottlingPolicy.SPEED_BASED) writeThrottle = new PagesWriteSpeedBasedThrottle(this, cpProgressProvider, stateChecker, log); else if (throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED) - writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, stateChecker, false); + writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, stateChecker, false, log); else if (throttlingPlc == ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY) - writeThrottle = new PagesWriteThrottle(this, null, stateChecker, true); + writeThrottle = new PagesWriteThrottle(this, null, stateChecker, true, log); } /** {@inheritDoc} */ @@ -1118,7 +1121,7 @@ public class PageMemoryImpl implements PageMemoryEx { } } else - return copyPageForCheckpoint(absPtr, fullId, outBuf, pageSingleAcquire, tracker) ? tag : null; + return copyPageForCheckpoint(absPtr, fullId, outBuf, pageSingleAcquire, tracker) ? tag : TRY_AGAIN_TAG; } /** @@ -1128,6 +1131,8 @@ public class PageMemoryImpl implements PageMemoryEx { * @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be * copied) in case checkpoint temporary buffer is used. * @param tracker Checkpoint statistics tracker. + * + * @return False if someone else holds lock on page. */ private boolean copyPageForCheckpoint( long absPtr, @@ -1139,7 +1144,10 @@ public class PageMemoryImpl implements PageMemoryEx { assert absPtr != 0; assert PageHeader.isAcquired(absPtr); - rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS); + boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS); + + if (!locked) + return false; try { long tmpRelPtr = PageHeader.tempBufferPointer(absPtr); @@ -2346,6 +2354,8 @@ public class PageMemoryImpl implements PageMemoryEx { Integer tag = partGenerationMap.get(new GroupPartitionId(grpId, partId)); + assert tag == null || tag >= 0 : "Negative tag=" + tag; + return tag == null ? INIT_PART_GENERATION : tag; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java index 68fa529..2dd8127 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Throttles threads that generate dirty pages during ongoing checkpoint. @@ -113,10 +114,12 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { * @param stateChecker Checkpoint lock state provider. * @param log Logger. */ - public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory, - CheckpointWriteProgressSupplier cpProgress, - CheckpointLockStateChecker stateChecker, - IgniteLogger log) { + public PagesWriteSpeedBasedThrottle( + PageMemoryImpl pageMemory, + CheckpointWriteProgressSupplier cpProgress, + CheckpointLockStateChecker stateChecker, + IgniteLogger log + ) { this.pageMemory = pageMemory; this.cpProgress = cpProgress; totalPages = pageMemory.totalPages(); @@ -229,6 +232,11 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { * @param throttleParkTimeNs the maximum number of nanoseconds to wait */ protected void doPark(long throttleParkTimeNs) { + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + U.warn(log, "Parking thread=" + Thread.currentThread().getName() + + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000)); + } + LockSupport.parkNanos(throttleParkTimeNs); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java index 166cdcd..d5f4bd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Throttles threads that generate dirty pages during ongoing checkpoint. @@ -50,21 +52,27 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { /** Counter for checkpoint buffer usage ratio throttling (we need a separate one due to IGNITE-7751). */ private final AtomicInteger inCheckpointBackoffCntr = new AtomicInteger(0); + /** Logger. */ + private IgniteLogger log; + /** * @param pageMemory Page memory. * @param cpProgress Database manager. * @param stateChecker checkpoint lock state checker. * @param throttleOnlyPagesInCheckpoint If true, throttle will only protect from checkpoint buffer overflow. + * @param log Logger. */ public PagesWriteThrottle(PageMemoryImpl pageMemory, CheckpointWriteProgressSupplier cpProgress, CheckpointLockStateChecker stateChecker, - boolean throttleOnlyPagesInCheckpoint + boolean throttleOnlyPagesInCheckpoint, + IgniteLogger log ) { this.pageMemory = pageMemory; this.cpProgress = cpProgress; this.stateChecker = stateChecker; this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint; + this.log = log; if (!throttleOnlyPagesInCheckpoint) assert cpProgress != null : "cpProgress must be not null if ratio based throttling mode is used"; @@ -111,7 +119,14 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { if (shouldThrottle) { int throttleLevel = cntr.getAndIncrement(); - LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel))); + long throttleParkTimeNs = (long) (STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel)); + + if (throttleParkTimeNs > LOGGING_THRESHOLD) { + U.warn(log, "Parking thread=" + Thread.currentThread().getName() + + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000)); + } + + LockSupport.parkNanos(throttleParkTimeNs); } else cntr.set(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java index adeaa3d..53a8017 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java @@ -17,10 +17,15 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; +import java.util.concurrent.TimeUnit; + /** * Throttling policy, encapsulates logic of delaying write operations. */ public interface PagesWriteThrottlePolicy { + /** Max park time. */ + public long LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(10); + /** * Callback to apply throttling delay. * @param isPageInCheckpoint flag indicating if current page is in scope of current checkpoint. http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java new file mode 100644 index 0000000..2b5a65d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java @@ -0,0 +1,358 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.OpenOption; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.store.PageStore; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; + +/** + * + */ +public class CheckpointBufferDeadlockTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Max size. */ + private static final int MAX_SIZE = 500 * 1024 * 1024; + + /** CP buffer size. */ + private static final int CP_BUF_SIZE = 20 * 1024 * 1024; + + /** Slow checkpoint enabled. */ + private static final AtomicBoolean slowCheckpointEnabled = new AtomicBoolean(false); + + /** Checkpoint park nanos. */ + private static final int CHECKPOINT_PARK_NANOS = 50_000_000; + + /** Entry byte chunk size. */ + private static final int ENTRY_BYTE_CHUNK_SIZE = 900; + + /** Pages touched under CP lock. */ + private static final int PAGES_TOUCHED_UNDER_CP_LOCK = 20; + + /** Slop load flag. */ + private static final AtomicBoolean stop = new AtomicBoolean(false); + + /** Checkpoint threads. */ + private int checkpointThreads; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setFileIOFactory(new SlowCheckpointFileIOFactory()) + .setCheckpointThreads(checkpointThreads) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(MAX_SIZE) + .setCheckpointPageBufferSize(CP_BUF_SIZE) + ) + ); + + cfg.setFailureHandler(new StopNodeFailureHandler()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stop.set(false); + + slowCheckpointEnabled.set(false); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stop.set(true); + + slowCheckpointEnabled.set(false); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * + */ + public void testFourCheckpointThreads() throws Exception { + checkpointThreads = 4; + + runDeadlockScenario(); + } + + /** + * + */ + public void testOneCheckpointThread() throws Exception { + checkpointThreads = 1; + + runDeadlockScenario(); + } + + /** + * + */ + private void runDeadlockScenario() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database(); + + FilePageStoreManager pageStoreMgr = (FilePageStoreManager)ig.context().cache().context().pageStore(); + + IgniteCache<Object, Object> singlePartCache = ig.getOrCreateCache(new CacheConfiguration<>() + .setName("single-part") + .setAffinity(new RendezvousAffinityFunction(false, 1))); + + db.enableCheckpoints(false).get(); + + Thread.sleep(1_000); + + try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(singlePartCache.getName())) { + int entries = MAX_SIZE / ENTRY_BYTE_CHUNK_SIZE / 4; + + for (int i = 0; i < entries; i++) + streamer.addData(i, new byte[ENTRY_BYTE_CHUNK_SIZE]); + + streamer.flush(); + } + + slowCheckpointEnabled.set(true); + log.info(">>> Slow checkpoints enabled"); + + db.enableCheckpoints(true).get(); + + AtomicBoolean fail = new AtomicBoolean(false); + + GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int loops = 0; + + while (!stop.get()) { + if (loops % 10 == 0 && loops > 0 && loops < 500 || loops % 500 == 0 && loops >= 500) + log.info("Successfully completed " + loops + " loops"); + + db.checkpointReadLock(); + + try { + Set<FullPageId> pickedPagesSet = new HashSet<>(); + + PageStore store = pageStoreMgr.getStore(CU.cacheId("single-part"), 0); + + int pages = store.pages(); + + DataRegion region = db.dataRegion(DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME); + + PageMemoryImpl pageMem = (PageMemoryImpl)region.pageMemory(); + + while (pickedPagesSet.size() < PAGES_TOUCHED_UNDER_CP_LOCK) { + int pageIdx = ThreadLocalRandom.current().nextInt( + PAGES_TOUCHED_UNDER_CP_LOCK, pages - PAGES_TOUCHED_UNDER_CP_LOCK); + + long pageId = PageIdUtils.pageId(0, PageIdAllocator.FLAG_DATA, pageIdx); + + pickedPagesSet.add(new FullPageId(pageId, CU.cacheId("single-part"))); + } + + List<FullPageId> pickedPages = new ArrayList<>(pickedPagesSet); + + assertEquals(PAGES_TOUCHED_UNDER_CP_LOCK, pickedPages.size()); + + // Sort to avoid deadlocks on pages rw-locks. + pickedPages.sort(new Comparator<FullPageId>() { + @Override public int compare(FullPageId o1, FullPageId o2) { + int cmp = Long.compare(o1.groupId(), o2.groupId()); + if (cmp != 0) + return cmp; + + return Long.compare(PageIdUtils.effectivePageId(o1.pageId()), + PageIdUtils.effectivePageId(o2.pageId())); + } + }); + + List<Long> readLockedPages = new ArrayList<>(); + + // Read lock many pages at once intentionally. + for (int i = 0; i < PAGES_TOUCHED_UNDER_CP_LOCK / 2; i++) { + FullPageId fpid = pickedPages.get(i); + + long page = pageMem.acquirePage(fpid.groupId(), fpid.pageId()); + + long abs = pageMem.readLock(fpid.groupId(), fpid.pageId(), page); + + assertFalse(fpid.toString(), abs == 0); + + readLockedPages.add(page); + } + + // Emulate writes to trigger throttling. + for (int i = PAGES_TOUCHED_UNDER_CP_LOCK / 2; i < PAGES_TOUCHED_UNDER_CP_LOCK; i++) { + FullPageId fpid = pickedPages.get(i); + + long page = pageMem.acquirePage(fpid.groupId(), fpid.pageId()); + + long abs = pageMem.writeLock(fpid.groupId(), fpid.pageId(), page); + + assertFalse(fpid.toString(), abs == 0); + + pageMem.writeUnlock(fpid.groupId(), fpid.pageId(), page, null, true); + + pageMem.releasePage(fpid.groupId(), fpid.pageId(), page); + } + + for (int i = 0; i < PAGES_TOUCHED_UNDER_CP_LOCK / 2; i++) { + FullPageId fpid = pickedPages.get(i); + + pageMem.readUnlock(fpid.groupId(), fpid.pageId(), readLockedPages.get(i)); + + pageMem.releasePage(fpid.groupId(), fpid.pageId(), readLockedPages.get(i)); + } + } + catch (Throwable e) { + log.error("Error in loader thread", e); + + fail.set(true); + } + finally { + db.checkpointReadUnlock(); + } + + loops++; + } + + } + }, 10, "load-runner"); + + Thread.sleep(10_000); // Await for the start of throttling. + + slowCheckpointEnabled.set(false); + log.info(">>> Slow checkpoints disabled"); + + assertFalse(fail.get()); + + forceCheckpoint(); // Previous checkpoint should eventually finish. + } + + /** + * Create File I/O that emulates poor checkpoint write speed. + */ + private static class SlowCheckpointFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, CREATE, READ, WRITE); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... openOption) throws IOException { + final FileIO delegate = delegateFactory.create(file, openOption); + + return new FileIODecorator(delegate) { + @Override public int write(ByteBuffer srcBuf) throws IOException { + parkIfNeeded(); + + return delegate.write(srcBuf); + } + + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + parkIfNeeded(); + + return delegate.write(srcBuf, position); + } + + @Override public int write(byte[] buf, int off, int len) throws IOException { + parkIfNeeded(); + + return delegate.write(buf, off, len); + } + + /** + * Parks current checkpoint thread if slow mode is enabled. + */ + private void parkIfNeeded() { + if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint")) + LockSupport.parkNanos(CHECKPOINT_PARK_NANOS); + } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + return delegate.map(sizeBytes); + } + }; + } + } + + +}
