Repository: ignite Updated Branches: refs/heads/master b2531569d -> b2f8cf8f4
IGNITE-7606 Write replaced page outside segment write lock - Fixes #3469. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b2f8cf8f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b2f8cf8f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b2f8cf8f Branch: refs/heads/master Commit: b2f8cf8f436e544911b110ad6e2643329af99d4f Parents: b253156 Author: dpavlov <[email protected]> Authored: Thu Feb 8 18:14:00 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Feb 8 21:46:26 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 6 + .../internal/pagemem/PageIdAllocator.java | 3 +- .../CheckpointWriteProgressSupplier.java | 45 +++ .../GridCacheDatabaseSharedManager.java | 53 ++-- .../pagemem/DelayedDirtyPageWrite.java | 105 +++++++ .../pagemem/DelayedPageReplacementTracker.java | 198 ++++++++++++ .../persistence/pagemem/PageMemoryImpl.java | 148 +++++---- .../pagemem/PagesWriteSpeedBasedThrottle.java | 30 +- .../persistence/pagemem/PagesWriteThrottle.java | 24 +- .../persistence/pagemem/ReplacedPageWriter.java | 35 +++ .../checkpoint/IgniteMassLoadSandboxTest.java | 4 +- .../pagemem/BPlusTreePageMemoryImplTest.java | 19 +- .../BPlusTreeReuseListPageMemoryImplTest.java | 18 +- ...gnitePageMemReplaceDelayedWriteUnitTest.java | 307 +++++++++++++++++++ .../pagemem/IgniteThrottlingUnitTest.java | 22 +- .../pagemem/IndexStoragePageMemoryImplTest.java | 18 +- .../pagemem/PageMemoryImplNoLoadTest.java | 12 +- .../persistence/pagemem/PageMemoryImplTest.java | 13 +- .../testsuites/IgnitePdsUnitTestSuite.java | 4 +- 19 files changed, 891 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 2b221a1..8e72099 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -811,6 +811,12 @@ public final class IgniteSystemProperties { public static final String IGNITE_DEV_ONLY_LOGGING_DISABLED = "IGNITE_DEV_ONLY_LOGGING_DISABLED"; /** + * When set to {@code true} (default), pages are written to page store without holding segment lock (with delay). + * Because other thread may require exactly the same page to be loaded from store, reads are protected by locking. + */ + public static final String IGNITE_DELAYED_REPLACED_PAGE_WRITE = "IGNITE_DELAYED_REPLACED_PAGE_WRITE"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java index dc504fe..ae7da14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java @@ -38,6 +38,7 @@ public interface PageIdAllocator { /** * Allocates a page from the space for the given partition ID and the given flags. * + * @param cacheId Cache Group ID. * @param partId Partition ID. * @return Allocated page ID. */ @@ -46,7 +47,7 @@ public interface PageIdAllocator { /** * The given page is free now. * - * @param cacheId Cache ID. + * @param cacheId Cache Group ID. * @param pageId Page ID. */ public boolean freePage(int cacheId, long pageId) throws IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java new file mode 100644 index 0000000..d20e8d4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointWriteProgressSupplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Provider for counters of checkpoint writes progress. + */ +public interface CheckpointWriteProgressSupplier { + /** + * Counter for written checkpoint pages. Not null only if checkpoint is running. + */ + public AtomicInteger writtenPagesCounter(); + + /** + * @return Counter for fsynced checkpoint pages. Not null only if checkpoint is running. + */ + public AtomicInteger syncedPagesCounter(); + + /** + * @return Counter for evicted pages during current checkpoint. Not null only if checkpoint is running. + */ + public AtomicInteger evictedPagesCntr(); + + /** + * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0. + */ + public int currentCheckpointPagesCount(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 1de48f6..e53bd44 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -170,7 +170,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.metastorag * */ @SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"}) -public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager { +public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager implements CheckpointWriteProgressSupplier { /** */ public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC"; @@ -361,7 +361,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Counter for fsynced checkpoint pages. Not null only if checkpoint is running. */ private volatile AtomicInteger syncedPagesCntr = null; - /** Counter for evictted checkpoint pages. Not null only if checkpoint is running. */ + /** Counter for evicted checkpoint pages. Not null only if checkpoint is running. */ private volatile AtomicInteger evictedPagesCntr = null; /** Number of pages in current checkpoint at the beginning of checkpoint. */ @@ -980,28 +980,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan ), cctx, memCfg.getPageSize(), - new GridInClosure3X<FullPageId, ByteBuffer, Integer>() { - @Override public void applyx( - FullPageId fullId, - ByteBuffer pageBuf, - Integer tag - ) throws IgniteCheckedException { - // First of all, write page to disk. - storeMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, tag); + (fullId, pageBuf, tag) -> { + // First of all, write page to disk. + storeMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, tag); - // Only after write we can write page into snapshot. - snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag); + // Only after write we can write page into snapshot. + snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag); - AtomicInteger cntr = evictedPagesCntr; + AtomicInteger cntr = evictedPagesCntr; - if (cntr != null) - cntr.incrementAndGet(); - } + if (cntr != null) + cntr.incrementAndGet(); }, changeTracker, this, memMetrics, - plc + plc, + this ); memMetrics.pageMemory(pageMem); @@ -2627,31 +2622,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - /** - * Counter for written checkpoint pages. Not null only if checkpoint is running. - */ - public AtomicInteger writtenPagesCounter() { + /** {@inheritDoc} */ + @Override public AtomicInteger writtenPagesCounter() { return writtenPagesCntr; } - /** - * @return Counter for fsynced checkpoint pages. Not null only if checkpoint is running. - */ - public AtomicInteger syncedPagesCounter() { + /** {@inheritDoc} */ + @Override public AtomicInteger syncedPagesCounter() { return syncedPagesCntr; } - /** - * @return Counter for evicted pages during current checkpoint. Not null only if checkpoint is running. - */ - public AtomicInteger evictedPagesCntr() { + /** {@inheritDoc} */ + @Override public AtomicInteger evictedPagesCntr() { return evictedPagesCntr; } - /** - * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0. - */ - public int currentCheckpointPagesCount() { + /** {@inheritDoc} */ + @Override public int currentCheckpointPagesCount() { return currCheckpointPagesCnt; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java new file mode 100644 index 0000000..6eec609 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.util.GridUnsafe; +import org.jetbrains.annotations.Nullable; + +/** + * Not thread safe and stateful class for replacement of one page with write() delay. This allows to write page content + * without holding segment lock. Page data is copied into temp buffer during {@link #writePage(FullPageId, ByteBuffer, + * int)} and then sent to real implementation by {@link #finishReplacement()}. + */ +public class DelayedDirtyPageWrite implements ReplacedPageWriter { + /** Real flush dirty page implementation. */ + private final ReplacedPageWriter flushDirtyPage; + + /** Page size. */ + private final int pageSize; + + /** Thread local with byte buffers. */ + private final ThreadLocal<ByteBuffer> byteBufThreadLoc; + + /** Replacing pages tracker, used to register & unregister pages being written. */ + private final DelayedPageReplacementTracker tracker; + + /** Full page id to be written on {@link #finishReplacement()} or null if nothing to write. */ + @Nullable private FullPageId fullPageId; + + /** Byte buffer with page data to be written on {@link #finishReplacement()} or null if nothing to write. */ + @Nullable private ByteBuffer byteBuf; + + /** Partition update tag to be used in{@link #finishReplacement()} or null if -1 to write. */ + private int tag = -1; + + /** + * @param flushDirtyPage real writer to save page to store. + * @param byteBufThreadLoc thread local buffers to use for pages copying. + * @param pageSize page size. + * @param tracker tracker to lock/unlock page reads. + */ + public DelayedDirtyPageWrite(ReplacedPageWriter flushDirtyPage, + ThreadLocal<ByteBuffer> byteBufThreadLoc, int pageSize, + DelayedPageReplacementTracker tracker) { + this.flushDirtyPage = flushDirtyPage; + this.pageSize = pageSize; + this.byteBufThreadLoc = byteBufThreadLoc; + this.tracker = tracker; + } + + /** {@inheritDoc} */ + @Override public void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) { + tracker.lock(fullPageId); + + ByteBuffer tlb = byteBufThreadLoc.get(); + + tlb.rewind(); + + long writeAddr = GridUnsafe.bufferAddress(tlb); + long origBufAddr = GridUnsafe.bufferAddress(byteBuf); + + GridUnsafe.copyMemory(origBufAddr, writeAddr, pageSize); + + this.fullPageId = fullPageId; + this.byteBuf = tlb; + this.tag = tag; + } + + /** + * Runs actual write if required. Method is 'no op' if there was no page selected for replacement. + * @throws IgniteCheckedException if write failed. + */ + public void finishReplacement() throws IgniteCheckedException { + if (byteBuf == null && fullPageId == null) + return; + + try { + flushDirtyPage.writePage(fullPageId, byteBuf, tag); + } + finally { + tracker.unlock(fullPageId); + + fullPageId = null; + byteBuf = null; + tag = -1; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java new file mode 100644 index 0000000..9cf5b77 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Collection; +import java.util.HashSet; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.FullPageId; + +/** + * Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being + * written to page store. + */ +public class DelayedPageReplacementTracker { + /** Page size. */ + private final int pageSize; + + /** Flush dirty page real implementation. */ + private final ReplacedPageWriter flushDirtyPage; + + /** Logger. */ + private final IgniteLogger log; + + /** Lock stripes for pages read protection. */ + private final Stripe[] stripes; + + /** Byte buffer thread local. */ + private final ThreadLocal<ByteBuffer> byteBufThreadLoc + = new ThreadLocal<ByteBuffer>() { + @Override protected ByteBuffer initialValue() { + ByteBuffer buf = ByteBuffer.allocateDirect(pageSize); + + buf.order(ByteOrder.LITTLE_ENDIAN); + + return buf; + } + }; + + /** + * Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageWrite} is + * stateful and not thread safe, this thread local protects from GC pressure on pages replacement. + */ + private final ThreadLocal<DelayedDirtyPageWrite> delayedPageWriteThreadLoc + = new ThreadLocal<DelayedDirtyPageWrite>() { + @Override protected DelayedDirtyPageWrite initialValue() { + return new DelayedDirtyPageWrite(flushDirtyPage, byteBufThreadLoc, pageSize, + DelayedPageReplacementTracker.this); + } + }; + + /** + * @param pageSize Page size. + * @param flushDirtyPage Flush dirty page. + * @param log Logger. + * @param segmentCnt Segments count. + */ + public DelayedPageReplacementTracker(int pageSize, ReplacedPageWriter flushDirtyPage, + IgniteLogger log, int segmentCnt) { + this.pageSize = pageSize; + this.flushDirtyPage = flushDirtyPage; + this.log = log; + stripes = new Stripe[segmentCnt]; + + for (int i = 0; i < stripes.length; i++) + stripes[i] = new Stripe(); + } + + /** + * @return delayed page write implementation, finish method to be called to actually write page. + */ + public DelayedDirtyPageWrite delayedPageWrite() { + return delayedPageWriteThreadLoc.get(); + } + + /** + * @param id Full page ID + * @return stripe related to current page identifier. + */ + private Stripe stripe(FullPageId id) { + int segmentIdx = PageMemoryImpl.segmentIndex(id.groupId(), id.pageId(), stripes.length); + + return stripes[segmentIdx]; + } + + /** + * @param id full page ID to lock from read + */ + public void lock(FullPageId id) { + stripe(id).lock(id); + } + + /** + * Method is returned when page is available to be loaded from store, or waits for replacement finish. + * + * @param id full page ID to be loaded from store. + */ + public void waitUnlock(FullPageId id) { + stripe(id).waitUnlock(id); + } + + /** + * @param id full page ID, which write has been finished, it is available for reading. + */ + public void unlock(FullPageId id) { + stripe(id).unlock(id); + } + + /** + * Stripe for locking pages from reading from store in parallel with not finished write. + */ + private class Stripe { + /** + * Page IDs which are locked for reading from store. Page content is being written right now. guarded by + * collection object monitor. + */ + private final Collection<FullPageId> locked = new HashSet<>(Runtime.getRuntime().availableProcessors() * 2); + + /** + * Has locked pages, flag for fast check if there are some pages, what were replaced and is being written. Write + * to field is guarded by {@link #locked} monitor. + */ + private volatile boolean hasLockedPages; + + /** + * @param id full page ID to lock from read + */ + public void lock(FullPageId id) { + synchronized (locked) { + hasLockedPages = true; + + boolean add = locked.add(id); + + assert add : "Double locking of page for replacement is not possible"; + } + } + + /** + * Method is returned when page is available to be loaded from store, or waits for replacement finish. + * + * @param id full page ID to be loaded from store. + */ + public void waitUnlock(FullPageId id) { + if (!hasLockedPages) + return; + + synchronized (locked) { + if (!hasLockedPages) + return; + + while (locked.contains(id)) { + if (log.isDebugEnabled()) + log.debug("Found replaced page [" + id + "] which is being written to page store, wait for finish replacement"); + + try { + locked.wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + } + } + } + + /** + * @param id full page ID, which write has been finished, it is available for reading. + */ + public void unlock(FullPageId id) { + synchronized (locked) { + boolean rmv = locked.remove(id); + + assert rmv : "Unlocking page ID never locked, id " + id; + + if (locked.isEmpty()) + hasLockedPages = false; + + locked.notifyAll(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index e4c369d..c9670bc 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -59,8 +59,8 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -85,6 +85,8 @@ import org.jetbrains.annotations.Nullable; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE; +import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer; /** @@ -181,7 +183,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** Shared context. */ private final GridCacheSharedContext<?, ?> ctx; - /** State checker. */ + /** Checkpoint lock state provider. */ private final CheckpointLockStateChecker stateChecker; /** Number of used pages in checkpoint buffer. */ @@ -217,10 +219,17 @@ public class PageMemoryImpl implements PageMemoryEx { private OffheapReadWriteLock rwLock; /** Flush dirty page closure. When possible, will be called by evictPage(). */ - private final GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage; + private final ReplacedPageWriter flushDirtyPage; /** - * Flush dirty page closure. When possible, will be called by evictPage(). + * Delayed page replacement (rotation with disk) tracker. Because other thread may require exactly the same page to be loaded from store, + * reads are protected by locking. + * {@code Null} if delayed write functionality is disabled. + */ + @Nullable private final DelayedPageReplacementTracker delayedPageReplacementTracker; + + /** + * Callback invoked to track changes in pages. * {@code Null} if page tracking functionality is disabled * */ @Nullable private final GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker; @@ -231,35 +240,44 @@ public class PageMemoryImpl implements PageMemoryEx { /** Write throttle type. */ private ThrottlingPolicy throttlingPlc; - /** */ - private boolean pageEvictWarned; + /** Checkpoint progress provider. Null disables throttling. */ + @Nullable private final CheckpointWriteProgressSupplier cpProgressProvider; + + /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */ + private volatile boolean pageReplacementWarned; /** */ private long[] sizes; - /** */ + /** Memory metrics to track dirty pages count and page replace rate. */ private DataRegionMetricsImpl memMetrics; /** * @param directMemoryProvider Memory allocator to use. + * @param sizes segments sizes, last is checkpoint pool size. * @param ctx Cache shared context. * @param pageSize Page size. - * @param flushDirtyPage Callback invoked when a dirty page is evicted. + * @param flushDirtyPage write callback invoked when a dirty page is removed for replacement. * @param changeTracker Callback invoked to track changes in pages. - * @param throttlingPlc Write throttle enabled and type. + * @param stateChecker Checkpoint lock state provider. Used to ensure lock is held by thread, which modify pages. + * @param memMetrics Memory metrics to track dirty pages count and page replace rate. + * @param throttlingPlc Write throttle enabled and its type. Null equal to none. + * @param cpProgressProvider checkpoint progress, base for throttling. Null disables throttling. */ public PageMemoryImpl( DirectMemoryProvider directMemoryProvider, long[] sizes, GridCacheSharedContext<?, ?> ctx, int pageSize, - GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage, + ReplacedPageWriter flushDirtyPage, @Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker, CheckpointLockStateChecker stateChecker, DataRegionMetricsImpl memMetrics, - ThrottlingPolicy throttlingPlc + @Nullable ThrottlingPolicy throttlingPlc, + @Nullable CheckpointWriteProgressSupplier cpProgressProvider ) { assert ctx != null; + assert pageSize > 0; log = ctx.logger(PageMemoryImpl.class); @@ -267,9 +285,14 @@ public class PageMemoryImpl implements PageMemoryEx { this.directMemoryProvider = directMemoryProvider; this.sizes = sizes; this.flushDirtyPage = flushDirtyPage; + delayedPageReplacementTracker = + getBoolean(IGNITE_DELAYED_REPLACED_PAGE_WRITE, true) + ? new DelayedPageReplacementTracker(pageSize, flushDirtyPage, log, sizes.length - 1) : + null; this.changeTracker = changeTracker; this.stateChecker = stateChecker; - this.throttlingPlc = throttlingPlc; + this.throttlingPlc = throttlingPlc != null ? throttlingPlc : ThrottlingPolicy.NONE; + this.cpProgressProvider = cpProgressProvider; storeMgr = ctx.pageStore(); walMgr = ctx.wal(); @@ -340,21 +363,19 @@ public class PageMemoryImpl implements PageMemoryEx { * */ private void initWriteThrottle() { - if (!(ctx.database() instanceof GridCacheDatabaseSharedManager)) { - log.error("Write throttle can't start. Unexpected class of database manager: " + - ctx.database().getClass()); + if (!isThrottlingEnabled()) + return; + + if (cpProgressProvider == null) { + log.error("Write throttle can't start. CP progress provider not presented"); throttlingPlc = ThrottlingPolicy.NONE; } - if (isThrottlingEnabled()) { - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.database(); - - if (throttlingPlc == ThrottlingPolicy.SPEED_BASED) - writeThrottle = new PagesWriteSpeedBasedThrottle(this, db, log); - else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED) - writeThrottle = new PagesWriteThrottle(this, db); - } + if (throttlingPlc == ThrottlingPolicy.SPEED_BASED) + writeThrottle = new PagesWriteSpeedBasedThrottle(this, cpProgressProvider, stateChecker, log); + else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED) + writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, stateChecker); } /** {@inheritDoc} */ @@ -435,7 +456,7 @@ public class PageMemoryImpl implements PageMemoryEx { flags == PageIdAllocator.FLAG_IDX && partId == PageIdAllocator.INDEX_PARTITION : "flags = " + flags + ", partId = " + partId; - assert ctx.database().checkpointLockIsHeldByThread(); + assert stateChecker.checkpointLockIsHeldByThread(); if (isThrottlingEnabled()) writeThrottle.onMarkDirty(false); @@ -449,6 +470,9 @@ public class PageMemoryImpl implements PageMemoryEx { // because there is no crc inside them. Segment seg = segment(cacheId, pageId); + DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null + ? delayedPageReplacementTracker.delayedPageWrite() : null; + FullPageId fullId = new FullPageId(pageId, cacheId); seg.writeLock().lock(); @@ -471,7 +495,7 @@ public class PageMemoryImpl implements PageMemoryEx { relPtr = seg.borrowOrAllocateFreePage(pageId); if (relPtr == INVALID_REL_PTR) - relPtr = seg.evictPage(); + relPtr = seg.removePageForReplacement(delayedWriter == null ? flushDirtyPage : delayedWriter); long absPtr = seg.absolute(relPtr); @@ -529,6 +553,9 @@ public class PageMemoryImpl implements PageMemoryEx { } finally { seg.writeLock().unlock(); + + if (delayedWriter != null) + delayedWriter.finishReplacement(); } //we have allocated 'tracking' page, we need to allocate regular one @@ -619,6 +646,9 @@ public class PageMemoryImpl implements PageMemoryEx { seg.readLock().unlock(); } + DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null + ? delayedPageReplacementTracker.delayedPageWrite() : null; + seg.writeLock().lock(); try { @@ -637,7 +667,7 @@ public class PageMemoryImpl implements PageMemoryEx { relPtr = seg.borrowOrAllocateFreePage(pageId); if (relPtr == INVALID_REL_PTR) - relPtr = seg.evictPage(); + relPtr = seg.removePageForReplacement(delayedWriter == null ? flushDirtyPage : delayedWriter); absPtr = seg.absolute(relPtr); @@ -664,6 +694,9 @@ public class PageMemoryImpl implements PageMemoryEx { try { ByteBuffer buf = wrapPointer(pageAddr, pageSize()); + if (delayedPageReplacementTracker != null) + delayedPageReplacementTracker.waitUnlock(fullId); + storeMgr.read(cacheId, pageId, buf); } catch (IgniteDataIntegrityViolationException ignore) { @@ -716,6 +749,9 @@ public class PageMemoryImpl implements PageMemoryEx { } finally { seg.writeLock().unlock(); + + if (delayedWriter != null) + delayedWriter.finishReplacement(); } } @@ -1508,7 +1544,7 @@ public class PageMemoryImpl implements PageMemoryEx { boolean wasDirty = PageHeader.dirty(absPtr, dirty); if (dirty) { - assert ctx.database().checkpointLockIsHeldByThread(); + assert stateChecker.checkpointLockIsHeldByThread(); if (!wasDirty || forceAdd) { boolean added = segment(pageId.groupId(), pageId.pageId()).dirtyPages.add(pageId); @@ -1890,14 +1926,15 @@ public class PageMemoryImpl implements PageMemoryEx { } /** - * Prepares a page for eviction, if needed. + * Prepares a page removal for page replacement, if needed. * * @param fullPageId Candidate page full ID. * @param absPtr Absolute pointer of the page to evict. - * @return {@code True} if it is ok to evict this page, {@code false} if another page should be selected. + * @param saveDirtyPage implementation to save dirty page to persistent storage. + * @return {@code True} if it is ok to replace this page, {@code false} if another page should be selected. * @throws IgniteCheckedException If failed to write page to the underlying store during eviction. */ - private boolean prepareEvict(FullPageId fullPageId, long absPtr) throws IgniteCheckedException { + private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException { assert writeLock().isHeldByCurrentThread(); // Do not evict cache meta pages. @@ -1919,7 +1956,7 @@ public class PageMemoryImpl implements PageMemoryEx { memMetrics.updatePageReplaceRate(U.currentTimeMillis() - PageHeader.readTimestamp(absPtr)); - flushDirtyPage.applyx( + saveDirtyPage.writePage( fullPageId, wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()), partTag( @@ -1984,16 +2021,17 @@ public class PageMemoryImpl implements PageMemoryEx { } /** - * Evict random oldest page from memory to storage. + * Removes random oldest page for page replacement from memory to storage. * - * @return Relative address for evicted page. + * @return Relative address for removed page, now it can be replaced by allocated or reloaded page. * @throws IgniteCheckedException If failed to evict page. + * @param saveDirtyPage Replaced page writer, implementation to save dirty page to persistent storage. */ - private long evictPage() throws IgniteCheckedException { + private long removePageForReplacement(ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException { assert getWriteHoldCount() > 0; - if (!pageEvictWarned) { - pageEvictWarned = true; + if (!pageReplacementWarned) { + pageReplacementWarned = true; U.warn(log, "Page evictions started, this will affect storage performance (consider increasing " + "DataRegionConfiguration#setMaxSize)."); @@ -2022,7 +2060,7 @@ public class PageMemoryImpl implements PageMemoryEx { // every time the same page may be found. Set<Long> ignored = null; - long relEvictAddr = INVALID_REL_PTR; + long relRmvAddr = INVALID_REL_PTR; int iterations = 0; @@ -2068,7 +2106,7 @@ public class PageMemoryImpl implements PageMemoryEx { boolean skip = ignored != null && ignored.contains(rndAddr); - if (relEvictAddr == rndAddr || pinned || skip) { + if (relRmvAddr == rndAddr || pinned || skip) { i--; continue; @@ -2095,33 +2133,30 @@ public class PageMemoryImpl implements PageMemoryEx { metaTs = pageTs; } - if (cleanAddr != INVALID_REL_PTR) { - relEvictAddr = cleanAddr; - } - else if (dirtyAddr != INVALID_REL_PTR) { - relEvictAddr = dirtyAddr; - } - else { - relEvictAddr = metaAddr; - } + if (cleanAddr != INVALID_REL_PTR) + relRmvAddr = cleanAddr; + else if (dirtyAddr != INVALID_REL_PTR) + relRmvAddr = dirtyAddr; + else + relRmvAddr = metaAddr; } - assert relEvictAddr != INVALID_REL_PTR; + assert relRmvAddr != INVALID_REL_PTR; - final long absEvictAddr = absolute(relEvictAddr); + final long absRmvAddr = absolute(relRmvAddr); - final FullPageId fullPageId = PageHeader.fullPageId(absEvictAddr); + final FullPageId fullPageId = PageHeader.fullPageId(absRmvAddr); - if (!prepareEvict(fullPageId, absEvictAddr)) { + if (!preparePageRemoval(fullPageId, absRmvAddr, saveDirtyPage)) { if (iterations > 10) { if (ignored == null) ignored = new HashSet<>(); - ignored.add(relEvictAddr); + ignored.add(relRmvAddr); } if (iterations > pool.pages() * FULL_SCAN_THRESHOLD) - return tryToFindSequentially(cap); + return tryToFindSequentially(cap, saveDirtyPage); continue; } @@ -2135,7 +2170,7 @@ public class PageMemoryImpl implements PageMemoryEx { ) ); - return relEvictAddr; + return relRmvAddr; } } @@ -2165,8 +2200,9 @@ public class PageMemoryImpl implements PageMemoryEx { * Will scan all segment pages to find one to evict it * * @param cap Capacity. + * @param saveDirtyPage Evicted page writer. */ - private long tryToFindSequentially(int cap) throws IgniteCheckedException { + private long tryToFindSequentially(int cap, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException { assert getWriteHoldCount() > 0; long prevAddr = INVALID_REL_PTR; @@ -2201,7 +2237,7 @@ public class PageMemoryImpl implements PageMemoryEx { final FullPageId fullPageId = PageHeader.fullPageId(absEvictAddr); - if (prepareEvict(fullPageId, absEvictAddr)) { + if (preparePageRemoval(fullPageId, absEvictAddr, saveDirtyPage)) { loadedPages.remove( fullPageId.groupId(), PageIdUtils.effectivePageId(fullPageId.pageId()), http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java index cb19eca..aaf5471 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java @@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.internal.util.GridConcurrentHashSet; /** @@ -37,7 +38,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { private final PageMemoryImpl pageMemory; /** Database manager. */ - private final GridCacheDatabaseSharedManager dbSharedMgr; + private final CheckpointWriteProgressSupplier cpProgress; /** Starting throttle time. Limits write speed to 1000 MB/s. */ private static final long STARTING_THROTTLE_NANOS = 4000; @@ -91,6 +92,9 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { /** Total pages which is possible to store in page memory. */ private long totalPages; + /** Checkpoint lock state provider. */ + private CheckpointLockStateChecker cpLockStateChecker; + /** Logger. */ private IgniteLogger log; @@ -105,22 +109,26 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { /** * @param pageMemory Page memory. - * @param dbSharedMgr Database manager. + * @param cpProgress Database manager. + * @param stateChecker Checkpoint lock state provider. * @param log Logger. */ public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory, - GridCacheDatabaseSharedManager dbSharedMgr, IgniteLogger log) { + CheckpointWriteProgressSupplier cpProgress, + CheckpointLockStateChecker stateChecker, + IgniteLogger log) { this.pageMemory = pageMemory; - this.dbSharedMgr = dbSharedMgr; + this.cpProgress = cpProgress; totalPages = pageMemory.totalPages(); + this.cpLockStateChecker = stateChecker; this.log = log; } /** {@inheritDoc} */ @Override public void onMarkDirty(boolean isPageInCheckpoint) { - assert dbSharedMgr.checkpointLockIsHeldByThread(); + assert cpLockStateChecker.checkpointLockIsHeldByThread(); - AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); + AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); if (writtenPagesCntr == null) { speedForMarkAll = 0; @@ -226,7 +234,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { * @return number of written pages. */ private int cpWrittenPages() { - AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); + AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); return writtenPagesCntr == null ? 0 : writtenPagesCntr.get(); } @@ -235,14 +243,14 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { * @return Number of pages in current checkpoint. */ private int cpTotalPages() { - return dbSharedMgr.currentCheckpointPagesCount(); + return cpProgress.currentCheckpointPagesCount(); } /** * @return Counter for fsynced checkpoint pages. */ private int cpSyncedPages() { - AtomicInteger syncedPagesCntr = dbSharedMgr.syncedPagesCounter(); + AtomicInteger syncedPagesCntr = cpProgress.syncedPagesCounter(); return syncedPagesCntr == null ? 0 : syncedPagesCntr.get(); } @@ -251,7 +259,7 @@ public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy { * @return number of evicted pages. */ private int cpEvictedPages() { - AtomicInteger evictedPagesCntr = dbSharedMgr.evictedPagesCntr(); + AtomicInteger evictedPagesCntr = cpProgress.evictedPagesCntr(); return evictedPagesCntr == null ? 0 : evictedPagesCntr.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java index 9206935..78e5344 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java @@ -18,7 +18,8 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; /** * Throttles threads that generate dirty pages during ongoing checkpoint. @@ -29,7 +30,10 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { private final PageMemoryImpl pageMemory; /** Database manager. */ - private final GridCacheDatabaseSharedManager dbSharedMgr; + private final CheckpointWriteProgressSupplier cpProgress; + + /** Checkpoint lock state checker. */ + private CheckpointLockStateChecker stateChecker; /** Starting throttle time. Limits write speed to 1000 MB/s. */ private static final long STARTING_THROTTLE_NANOS = 4000; @@ -41,18 +45,22 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0); /** * @param pageMemory Page memory. - * @param dbSharedMgr Database manager. + * @param cpProgress Database manager. + * @param stateChecker checkpoint lock state checker. */ - public PagesWriteThrottle(PageMemoryImpl pageMemory, GridCacheDatabaseSharedManager dbSharedMgr) { + public PagesWriteThrottle(PageMemoryImpl pageMemory, + CheckpointWriteProgressSupplier cpProgress, + CheckpointLockStateChecker stateChecker) { this.pageMemory = pageMemory; - this.dbSharedMgr = dbSharedMgr; + this.cpProgress = cpProgress; + this.stateChecker = stateChecker; } /** {@inheritDoc} */ @Override public void onMarkDirty(boolean isPageInCheckpoint) { - assert dbSharedMgr.checkpointLockIsHeldByThread(); + assert stateChecker.checkpointLockIsHeldByThread(); - AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter(); + AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter(); if (writtenPagesCntr == null) return; // Don't throttle if checkpoint is not running. @@ -68,7 +76,7 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy { if (!shouldThrottle) { int cpWrittenPages = writtenPagesCntr.get(); - int cpTotalPages = dbSharedMgr.currentCheckpointPagesCount(); + int cpTotalPages = cpProgress.currentCheckpointPagesCount(); if (cpWrittenPages == cpTotalPages) { // Checkpoint is already in fsync stage, increasing maximum ratio of dirty pages to 3/4 http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java new file mode 100644 index 0000000..30f9633 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.FullPageId; + +/** + * Flush (write) dirty page implementation for freed page during page replacement. When possible, will be called by + * removePageForReplacement(). + */ +public interface ReplacedPageWriter { + /** + * @param fullPageId Full page ID being evicted. + * @param byteBuf Buffer with page data. + * @param tag partition update tag, increasing counter. + * @throws IgniteCheckedException if page write failed. + */ + void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java index c49f08e..b043fa9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java @@ -110,7 +110,7 @@ public class IgniteMassLoadSandboxTest extends GridCommonAbstractTest { DataRegionConfiguration regCfg = new DataRegionConfiguration() .setName("dfltMemPlc") .setMetricsEnabled(true) - .setMaxSize(2 * 1024L * 1024 * 1024) + .setMaxSize(256L * 1024 * 1024) .setPersistenceEnabled(true); DataStorageConfiguration dsCfg = new DataStorageConfiguration(); @@ -232,6 +232,8 @@ public class IgniteMassLoadSandboxTest extends GridCommonAbstractTest { // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, "true"); System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, "false"); System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, "speed"); + System.setProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE, "true"); + setWalArchAndWorkToSameVal = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 1b6fde3..a305b7f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -17,16 +17,14 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; -import java.nio.ByteBuffer; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; -import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.database.BPlusTreeSelfTest; import org.apache.ignite.internal.util.typedef.CIX3; import org.apache.ignite.testframework.junits.GridTestKernalContext; @@ -69,22 +67,17 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { provider, sizes, sharedCtx, PAGE_SIZE, - new CIX3<FullPageId, ByteBuffer, Integer>() { - @Override public void applyx(FullPageId fullPageId, ByteBuffer byteBuf, Integer tag) { - assert false : "No evictions should happen during the test"; - } + (fullPageId, byteBuf, tag) -> { + assert false : "No page replacement should happen during the test"; }, new CIX3<Long, FullPageId, PageMemoryEx>(){ @Override public void applyx(Long aLong, FullPageId fullPageId, PageMemoryEx ex) { } }, - new CheckpointLockStateChecker() { - @Override public boolean checkpointLockIsHeldByThread() { - return true; - } - }, + () -> true, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE + PageMemoryImpl.ThrottlingPolicy.NONE, + null ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index 1180164..1fc34c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; -import java.nio.ByteBuffer; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; @@ -25,11 +24,10 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; -import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.database.BPlusTreeReuseSelfTest; import org.apache.ignite.internal.util.lang.GridInClosure3X; -import org.apache.ignite.internal.util.typedef.CIX3; import org.apache.ignite.testframework.junits.GridTestKernalContext; /** @@ -70,21 +68,17 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest provider, sizes, sharedCtx, PAGE_SIZE, - new CIX3<FullPageId, ByteBuffer, Integer>() { - @Override public void applyx(FullPageId fullPageId, ByteBuffer byteBuf, Integer tag) { - assert false : "No evictions should happen during the test"; - } + (fullPageId, byteBuf, tag) -> { + assert false : "No page replacement (rotation with disk) should happen during the test"; }, new GridInClosure3X<Long, FullPageId, PageMemoryEx>() { @Override public void applyx(Long page, FullPageId fullPageId, PageMemoryEx pageMem) { } - }, new CheckpointLockStateChecker() { - @Override public boolean checkpointLockIsHeldByThread() { - return true; - } }, + () -> true, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE + PageMemoryImpl.ThrottlingPolicy.NONE, + null ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java new file mode 100644 index 0000000..94e30e7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.mem.DirectMemoryProvider; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.util.GridMultiCollectionWrapper; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.logger.NullLogger; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit test for delayed page replacement mode. + */ +public class IgnitePageMemReplaceDelayedWriteUnitTest { + /** CPU count. */ + private static final int CPUS = Runtime.getRuntime().availableProcessors(); + + /** 1 megabyte in bytes */ + private static final long MB = 1024L * 1024; + + /** Logger. */ + private IgniteLogger log = new NullLogger(); + + /** Page memory, published here for backward call from replacement write callback. */ + private PageMemoryImpl pageMemory; + + /** + * Test delayed eviction causes locking in page reads + * @throws IgniteCheckedException if failed. + */ + @Test + public void testReplacementWithDelayCausesLockForRead() throws IgniteCheckedException { + IgniteConfiguration cfg = getConfiguration(MB); + + AtomicInteger totalEvicted = new AtomicInteger(); + + ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> { + log.info("Evicting " + fullPageId); + + assert getLockedPages(fullPageId).contains(fullPageId); + + assert !getSegment(fullPageId).writeLock().isHeldByCurrentThread(); + + totalEvicted.incrementAndGet(); + }; + + int pageSize = 4096; + PageMemoryImpl memory = createPageMemory(cfg, pageWriter, pageSize); + + this.pageMemory = memory; + + long pagesTotal = cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().getMaxSize() / pageSize; + long markDirty = pagesTotal * 2 / 3; + for (int i = 0; i < markDirty; i++) { + long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + long ptr = memory.acquirePage(1, pageId); + + memory.releasePage(1, pageId, ptr); + } + + GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(); + int cpPages = ids.size(); + log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages"); + + for (int i = 0; i < cpPages; i++) { + long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + long ptr = memory.acquirePage(1, pageId); + memory.releasePage(1, pageId, ptr); + } + + List<Collection<FullPageId>> stripes = getAllLockedPages(); + + assert !stripes.isEmpty(); + + for (Collection<FullPageId> pageIds : stripes) { + assert pageIds.isEmpty(); + } + + assert totalEvicted.get() > 0; + + memory.stop(); + } + + /** + * Test delayed eviction causes locking in page reads + * @throws IgniteCheckedException if failed. + */ + @Test + public void testBackwardCompatibilityMode() throws IgniteCheckedException { + IgniteConfiguration cfg = getConfiguration(MB); + + AtomicInteger totalEvicted = new AtomicInteger(); + + ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> { + log.info("Evicting " + fullPageId); + + assert getSegment(fullPageId).writeLock().isHeldByCurrentThread(); + + totalEvicted.incrementAndGet(); + }; + + System.setProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE, "false"); + int pageSize = 4096; + PageMemoryImpl memory; + + try { + memory = createPageMemory(cfg, pageWriter, pageSize); + } + finally { + System.clearProperty(IgniteSystemProperties.IGNITE_DELAYED_REPLACED_PAGE_WRITE); + } + + this.pageMemory = memory; + + long pagesTotal = cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().getMaxSize() / pageSize; + long markDirty = pagesTotal * 2 / 3; + for (int i = 0; i < markDirty; i++) { + long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + long ptr = memory.acquirePage(1, pageId); + + memory.releasePage(1, pageId, ptr); + } + + GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(); + int cpPages = ids.size(); + log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages"); + + for (int i = 0; i < cpPages; i++) { + long pageId = memory.allocatePage(1, 1, PageIdAllocator.FLAG_DATA); + long ptr = memory.acquirePage(1, pageId); + memory.releasePage(1, pageId, ptr); + } + + assert totalEvicted.get() > 0; + + memory.stop(); + } + + /** + * @param fullPageId page ID to determine segment for + * @return segment related + */ + @SuppressWarnings("TypeMayBeWeakened") private ReentrantReadWriteLock getSegment(FullPageId fullPageId) { + ReentrantReadWriteLock[] segments = U.field(pageMemory, "segments"); + + int idx = PageMemoryImpl.segmentIndex(fullPageId.groupId(), fullPageId.pageId(), + segments.length); + + return segments[idx]; + } + + /** + * @param cfg configuration + * @param pageWriter writer for page replacement. + * @param pageSize page size + * @return implementation for test + */ + @NotNull + private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, ReplacedPageWriter pageWriter, int pageSize) { + IgniteCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class); + + when(db.checkpointLockIsHeldByThread()).thenReturn(true); + + GridCacheSharedContext sctx = Mockito.mock(GridCacheSharedContext.class); + + when(sctx.pageStore()).thenReturn(new NoOpPageStoreManager()); + when(sctx.wal()).thenReturn(new NoOpWALManager()); + when(sctx.database()).thenReturn(db); + when(sctx.logger(any(Class.class))).thenReturn(log); + + GridKernalContext kernalCtx = mock(GridKernalContext.class); + + when(kernalCtx.config()).thenReturn(cfg); + when(sctx.kernalContext()).thenReturn(kernalCtx); + + DataRegionConfiguration regCfg = cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration(); + + DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(regCfg); + + long[] sizes = prepareSegmentSizes(regCfg.getMaxSize()); + + DirectMemoryProvider provider = new UnsafeMemoryProvider(log); + + PageMemoryImpl memory + = new PageMemoryImpl(provider, sizes, sctx, pageSize, + pageWriter, null, () -> true, memMetrics, null, null); + + memory.start(); + return memory; + } + + /** + * @param overallSize default region size in bytes + * @return configuration for test. + */ + @NotNull private IgniteConfiguration getConfiguration(long overallSize) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(overallSize))); + + return cfg; + } + + /** + * @return collection with pages locked + * @param fullPageId page id to check lock. + */ + private Collection<FullPageId> getLockedPages(FullPageId fullPageId) { + Object tracker = delayedReplacementTracker(); + + Object[] stripes = U.field(tracker, "stripes"); + + int idx = PageMemoryImpl.segmentIndex(fullPageId.groupId(), fullPageId.pageId(), + stripes.length); + + return U.field(stripes[idx], "locked"); + } + + /** + * @return delayed write tracked from page memory. + */ + @NotNull private Object delayedReplacementTracker() { + Object tracker = U.field(pageMemory, "delayedPageReplacementTracker"); + + if (tracker == null) + throw new IllegalStateException("Delayed replacement is not configured"); + + return tracker; + } + + /** + * @return all locked pages stripes underlying collectinos + */ + private List<Collection<FullPageId>> getAllLockedPages() { + Object tracker = delayedReplacementTracker(); + + Object[] stripes = U.field(tracker, "stripes"); + + Stream<Collection<FullPageId>> locked = Arrays.asList(stripes).stream().map(stripe -> + (Collection<FullPageId>)U.field(stripe, "locked")); + + return locked.collect(Collectors.toList()); + } + + /** + * @param overallSize all regions size + * @return segments size, cp pool is latest array element. + */ + private long[] prepareSegmentSizes(long overallSize) { + int segments = CPUS; + long[] sizes = new long[segments + 1]; + + for (int i = 0; i < sizes.length; i++) + sizes[i] = overallSize / segments; + + sizes[segments] = overallSize / 100; + + return sizes; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java index 054696c..1cef087 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java @@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.logger.NullLogger; import org.junit.Test; @@ -42,6 +44,9 @@ public class IgniteThrottlingUnitTest { /** Page memory 2 g. */ private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class); + /** State checker. */ + private CheckpointLockStateChecker stateChecker = () -> true; + { when(pageMemory2g.totalPages()).thenReturn((2L * 1024 * 1024 * 1024) / 4096); } @@ -51,7 +56,7 @@ public class IgniteThrottlingUnitTest { */ @Test public void breakInCaseTooFast() { - PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); long time = throttle.getParkTime(0.67, (362584 + 67064) / 2, @@ -68,7 +73,7 @@ public class IgniteThrottlingUnitTest { */ @Test public void noBreakIfNotFastWrite() { - PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); long time = throttle.getParkTime(0.47, ((362584 + 67064) / 2), @@ -150,7 +155,7 @@ public class IgniteThrottlingUnitTest { */ @Test public void beginOfCp() { - PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); assertTrue(throttle.getParkTime(0.01, 100,400000, 1, @@ -177,7 +182,7 @@ public class IgniteThrottlingUnitTest { */ @Test public void enforceThrottleAtTheEndOfCp() { - PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); long time1 = throttle.getParkTime(0.70, 300000, 400000, 1, 20200, 23000); @@ -200,7 +205,7 @@ public class IgniteThrottlingUnitTest { */ @Test public void tooMuchPagesMarkedDirty() { - PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log); + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, stateChecker, log); // 363308 350004 348976 10604 long time = throttle.getParkTime(0.75, @@ -234,11 +239,10 @@ public class IgniteThrottlingUnitTest { }).when(log).info(anyString()); AtomicInteger written = new AtomicInteger(); - GridCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class); - when(db.checkpointLockIsHeldByThread()).thenReturn(true); - when(db.writtenPagesCounter()).thenReturn(written); + CheckpointWriteProgressSupplier cpProgress = mock(CheckpointWriteProgressSupplier.class); + when(cpProgress.writtenPagesCounter()).thenReturn(written); - PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, db, log) { + PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, cpProgress, stateChecker, log) { @Override protected void doPark(long throttleParkTimeNs) { //do nothing } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java index 04f3bd0..4495dc1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import java.io.File; -import java.nio.ByteBuffer; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; @@ -26,11 +25,10 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; -import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.database.IndexStorageSelfTest; import org.apache.ignite.internal.util.lang.GridInClosure3X; -import org.apache.ignite.internal.util.typedef.CIX3; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.GridTestKernalContext; @@ -85,21 +83,17 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest { provider, sizes, sharedCtx, PAGE_SIZE, - new CIX3<FullPageId, ByteBuffer, Integer>() { - @Override public void applyx(FullPageId fullPageId, ByteBuffer byteBuf, Integer tag) { - assert false : "No evictions should happen during the test"; - } + (fullPageId, byteBuf, tag) -> { + assert false : "No page replacement (rotation with disk) should happen during the test"; }, new GridInClosure3X<Long, FullPageId, PageMemoryEx>() { @Override public void applyx(Long page, FullPageId fullId, PageMemoryEx pageMem) { } - }, new CheckpointLockStateChecker() { - @Override public boolean checkpointLockIsHeldByThread() { - return true; - } }, + () -> true, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE + PageMemoryImpl.ThrottlingPolicy.NONE, + null ); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index ed285c5..3c169be 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import java.io.File; -import java.nio.ByteBuffer; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; @@ -28,9 +27,8 @@ import org.apache.ignite.internal.pagemem.impl.PageMemoryNoLoadSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; -import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; -import org.apache.ignite.internal.util.typedef.CIX3; +import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.GridTestKernalContext; @@ -75,10 +73,8 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { sizes, sharedCtx, PAGE_SIZE, - new CIX3<FullPageId, ByteBuffer, Integer>() { - @Override public void applyx(FullPageId fullPageId, ByteBuffer byteBuffer, Integer tag) { - assert false : "No evictions should happen during the test"; - } + (fullPageId, byteBuf, tag) -> { + assert false : "No page replacement (rotation with disk) should happen during the test"; }, new GridInClosure3X<Long, FullPageId, PageMemoryEx>() { @Override public void applyx(Long page, FullPageId fullId, PageMemoryEx pageMem) { @@ -90,7 +86,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { } }, new DataRegionMetricsImpl(new DataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE + PageMemoryImpl.ThrottlingPolicy.NONE, null ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 1369f28..8f0ef39 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; -import java.nio.ByteBuffer; import java.util.Collections; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -28,11 +27,10 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker; -import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.util.lang.GridInClosure3X; -import org.apache.ignite.internal.util.typedef.CIX3; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -108,10 +106,8 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { sizes, sharedCtx, PAGE_SIZE, - new CIX3<FullPageId, ByteBuffer, Integer>() { - @Override public void applyx(FullPageId fullPageId, ByteBuffer byteBuf, Integer tag) { - assert false : "No evictions should happen during the test"; - } + (fullPageId, byteBuf, tag) -> { + assert false : "No page replacement (rotation with disk) should happen during the test"; }, new GridInClosure3X<Long, FullPageId, PageMemoryEx>() { @Override public void applyx(Long page, FullPageId fullId, PageMemoryEx pageMem) { @@ -122,7 +118,8 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { } }, new DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()), - PageMemoryImpl.ThrottlingPolicy.NONE + PageMemoryImpl.ThrottlingPolicy.NONE, + null ); mem.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2f8cf8f/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java index a2bfeb3..9906429 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.testsuites; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.IgnitePageMemReplaceDelayedWriteUnitTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.IgniteThrottlingUnitTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -25,7 +26,8 @@ import org.junit.runners.Suite; */ @RunWith(Suite.class) @Suite.SuiteClasses({ - IgniteThrottlingUnitTest.class + IgniteThrottlingUnitTest.class, + IgnitePageMemReplaceDelayedWriteUnitTest.class }) public class IgnitePdsUnitTestSuite { }
