This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 4ede2ee2f7f65dba33f263d0fee7cb26709858d7 Author: Alexey Goncharuk <[email protected]> AuthorDate: Mon Dec 2 19:51:31 2019 +0300 IGNITE-12430 Move PagePool to a separate class --- .../cache/persistence/pagemem/PageHeader.java | 274 ++++++++++++ .../cache/persistence/pagemem/PageMemoryImpl.java | 487 +-------------------- .../cache/persistence/pagemem/PagePool.java | 243 ++++++++++ .../apache/ignite/internal/util/GridUnsafe.java | 20 + .../apache/ignite/internal/util/IgniteUtils.java | 15 + .../cache/persistence/pagemem/PagePoolTest.java | 337 ++++++++++++++ .../ignite/internal/util/IgniteUtilsSelfTest.java | 19 + .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 8 files changed, 930 insertions(+), 467 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageHeader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageHeader.java new file mode 100644 index 0000000..20930ce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageHeader.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.util.GridUnsafe; + +/** + * + */ +class PageHeader { + /** */ + public static final long PAGE_MARKER = 0x0000000000000001L; + + /** Dirty flag. */ + private static final long DIRTY_FLAG = 0x0100000000000000L; + + /** Page relative pointer. Does not change once a page is allocated. */ + private static final int RELATIVE_PTR_OFFSET = 8; + + /** Page ID offset */ + private static final int PAGE_ID_OFFSET = 16; + + /** Page cache group ID offset. */ + private static final int PAGE_CACHE_ID_OFFSET = 24; + + /** Page pin counter offset. */ + private static final int PAGE_PIN_CNT_OFFSET = 28; + + /** Page temp copy buffer relative pointer offset. */ + private static final int PAGE_TMP_BUF_OFFSET = 40; + + /** + * @param absPtr Absolute pointer to initialize. + * @param relative Relative pointer to write. + */ + public static void initNew(long absPtr, long relative) { + relative(absPtr, relative); + + tempBufferPointer(absPtr, PageMemoryImpl.INVALID_REL_PTR); + + GridUnsafe.putLong(absPtr, PAGE_MARKER); + GridUnsafe.putInt(absPtr + PAGE_PIN_CNT_OFFSET, 0); + } + + /** + * @param absPtr Absolute pointer. + * @return Dirty flag. + */ + public static boolean dirty(long absPtr) { + return flag(absPtr, DIRTY_FLAG); + } + + /** + * @param absPtr Page absolute pointer. + * @param dirty Dirty flag. + * @return Previous value of dirty flag. + */ + public static boolean dirty(long absPtr, boolean dirty) { + return flag(absPtr, DIRTY_FLAG, dirty); + } + + /** + * @param absPtr Absolute pointer. + * @param flag Flag mask. + * @return Flag value. + */ + private static boolean flag(long absPtr, long flag) { + assert (flag & 0xFFFFFFFFFFFFFFL) == 0; + assert Long.bitCount(flag) == 1; + + long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET); + + return (relPtrWithFlags & flag) != 0; + } + + /** + * Sets flag. + * + * @param absPtr Absolute pointer. + * @param flag Flag mask. + * @param set New flag value. + * @return Previous flag value. + */ + private static boolean flag(long absPtr, long flag, boolean set) { + assert (flag & 0xFFFFFFFFFFFFFFL) == 0; + assert Long.bitCount(flag) == 1; + + long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET); + + boolean was = (relPtrWithFlags & flag) != 0; + + if (set) + relPtrWithFlags |= flag; + else + relPtrWithFlags &= ~flag; + + GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtrWithFlags); + + return was; + } + + /** + * @param absPtr Page pointer. + * @return If page is pinned. + */ + public static boolean isAcquired(long absPtr) { + return GridUnsafe.getInt(absPtr + PAGE_PIN_CNT_OFFSET) > 0; + } + + /** + * @param absPtr Absolute pointer. + */ + public static void acquirePage(long absPtr) { + GridUnsafe.incrementAndGetInt(absPtr + PAGE_PIN_CNT_OFFSET); + } + + /** + * @param absPtr Absolute pointer. + */ + public static int releasePage(long absPtr) { + return GridUnsafe.decrementAndGetInt(absPtr + PAGE_PIN_CNT_OFFSET); + } + + /** + * @param absPtr Absolute pointer. + * @return Number of acquires for the page. + */ + public static int pinCount(long absPtr) { + return GridUnsafe.getIntVolatile(null, absPtr); + } + + /** + * Reads relative pointer from the page at the given absolute position. + * + * @param absPtr Absolute memory pointer to the page header. + * @return Relative pointer written to the page. + */ + public static long readRelative(long absPtr) { + return GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET) & PageMemoryImpl.RELATIVE_PTR_MASK; + } + + /** + * Writes relative pointer to the page at the given absolute position. + * + * @param absPtr Absolute memory pointer to the page header. + * @param relPtr Relative pointer to write. + */ + public static void relative(long absPtr, long relPtr) { + GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtr & PageMemoryImpl.RELATIVE_PTR_MASK); + } + + /** + * Volatile write for current timestamp to page in {@code absAddr} address. + * + * @param absPtr Absolute page address. + */ + public static void writeTimestamp(final long absPtr, long tstamp) { + tstamp >>= 8; + + GridUnsafe.putLongVolatile(null, absPtr, (tstamp << 8) | 0x01); + } + + /** + * Read for timestamp from page in {@code absAddr} address. + * + * @param absPtr Absolute page address. + * @return Timestamp. + */ + public static long readTimestamp(final long absPtr) { + long markerAndTs = GridUnsafe.getLong(absPtr); + + // Clear last byte as it is occupied by page marker. + return markerAndTs & ~0xFF; + } + + /** + * Sets pointer to checkpoint buffer. + * + * @param absPtr Page absolute pointer. + * @param tmpRelPtr Temp buffer relative pointer or {@link PageMemoryImpl#INVALID_REL_PTR} if page is not copied to checkpoint + * buffer. + */ + public static void tempBufferPointer(long absPtr, long tmpRelPtr) { + GridUnsafe.putLong(absPtr + PAGE_TMP_BUF_OFFSET, tmpRelPtr); + } + + /** + * Gets pointer to checkpoint buffer or {@link PageMemoryImpl#INVALID_REL_PTR} if page is not copied to checkpoint buffer. + * + * @param absPtr Page absolute pointer. + * @return Temp buffer relative pointer. + */ + public static long tempBufferPointer(long absPtr) { + return GridUnsafe.getLong(absPtr + PAGE_TMP_BUF_OFFSET); + } + + /** + * Reads page ID from the page at the given absolute position. + * + * @param absPtr Absolute memory pointer to the page header. + * @return Page ID written to the page. + */ + public static long readPageId(long absPtr) { + return GridUnsafe.getLong(absPtr + PAGE_ID_OFFSET); + } + + /** + * Writes page ID to the page at the given absolute position. + * + * @param absPtr Absolute memory pointer to the page header. + * @param pageId Page ID to write. + */ + private static void pageId(long absPtr, long pageId) { + GridUnsafe.putLong(absPtr + PAGE_ID_OFFSET, pageId); + } + + /** + * Reads cache group ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @return Cache group ID written to the page. + */ + private static int readPageGroupId(final long absPtr) { + return GridUnsafe.getInt(absPtr + PAGE_CACHE_ID_OFFSET); + } + + /** + * Writes cache group ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @param grpId Cache group ID to write. + */ + private static void pageGroupId(final long absPtr, final int grpId) { + GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, grpId); + } + + /** + * Reads page ID and cache group ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @return Full page ID written to the page. + */ + public static FullPageId fullPageId(final long absPtr) { + return new FullPageId(readPageId(absPtr), readPageGroupId(absPtr)); + } + + /** + * Writes page ID and cache group ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @param fullPageId Full page ID to write. + */ + public static void fullPageId(final long absPtr, final FullPageId fullPageId) { + pageId(absPtr, fullPageId.pageId()); + + pageGroupId(absPtr, fullPageId.groupId()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 7adf1c5..b0cc7bd 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -32,7 +32,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -85,7 +84,6 @@ import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.OffheapReadWriteLock; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.lang.GridInClosure3X; -import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -125,51 +123,18 @@ import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer; */ @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) public class PageMemoryImpl implements PageMemoryEx { - /** */ - public static final long PAGE_MARKER = 0x0000000000000001L; - - /** Relative pointer chunk index mask. */ - private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L; - /** Full relative pointer mask. */ - private static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL; - - /** Dirty flag. */ - private static final long DIRTY_FLAG = 0x0100000000000000L; + public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL; /** Invalid relative pointer value. */ - private static final long INVALID_REL_PTR = RELATIVE_PTR_MASK; + public static final long INVALID_REL_PTR = RELATIVE_PTR_MASK; /** Pointer which means that this page is outdated (for example, cache was destroyed, partition eviction'd happened */ private static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1; - /** Address mask to avoid ABA problem. */ - private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL; - - /** Counter mask to avoid ABA problem. */ - private static final long COUNTER_MASK = ~ADDRESS_MASK; - - /** Counter increment to avoid ABA problem. */ - private static final long COUNTER_INC = ADDRESS_MASK + 1; - - /** Page relative pointer. Does not change once a page is allocated. */ - public static final int RELATIVE_PTR_OFFSET = 8; - - /** Page ID offset */ - public static final int PAGE_ID_OFFSET = 16; - - /** Page cache group ID offset. */ - public static final int PAGE_CACHE_ID_OFFSET = 24; - - /** Page pin counter offset. */ - public static final int PAGE_PIN_CNT_OFFSET = 28; - /** Page lock offset. */ public static final int PAGE_LOCK_OFFSET = 32; - /** Page temp copy buffer relative pointer offset. */ - public static final int PAGE_TMP_BUF_OFFSET = 40; - /** * 8b Marker/timestamp * 8b Relative pointer @@ -206,20 +171,17 @@ public class PageMemoryImpl implements PageMemoryEx { /** Checkpoint lock state provider. */ private final CheckpointLockStateChecker stateChecker; - /** Number of used pages in checkpoint buffer. */ - private final AtomicInteger cpBufPagesCntr = new AtomicInteger(0); - /** Use new implementation of loaded pages table: 'Robin Hood hashing: backward shift deletion'. */ private final boolean useBackwardShiftMap = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP, true); /** */ - private ExecutorService asyncRunner = new ThreadPoolExecutor( + private final ExecutorService asyncRunner = new ThreadPoolExecutor( 0, Runtime.getRuntime().availableProcessors(), 30L, TimeUnit.SECONDS, - new ArrayBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors())); + new ArrayBlockingQueue<>(Runtime.getRuntime().availableProcessors())); /** Page store manager. */ private IgnitePageStoreManager storeMgr; @@ -243,7 +205,7 @@ public class PageMemoryImpl implements PageMemoryEx { private volatile Segment[] segments; /** Lock for segments changes. */ - private Object segmentsLock = new Object(); + private final Object segmentsLock = new Object(); /** */ private PagePool checkpointPool; @@ -378,7 +340,7 @@ public class PageMemoryImpl implements PageMemoryEx { DirectMemoryRegion cpReg = regions.get(regs - 1); - checkpointPool = new PagePool(regs - 1, cpReg, cpBufPagesCntr); + checkpointPool = new PagePool(regs - 1, cpReg, sysPageSize, rwLock); long checkpointBuf = cpReg.size(); @@ -582,7 +544,7 @@ public class PageMemoryImpl implements PageMemoryEx { assert !PageHeader.isAcquired(absPtr) : "Pin counter must be 0 for a new page [relPtr=" + U.hexLong(relPtr) + - ", absPtr=" + U.hexLong(absPtr) + ", pinCntr=" + GridUnsafe.getInt(absPtr + PAGE_PIN_CNT_OFFSET) + ']'; + ", absPtr=" + U.hexLong(absPtr) + ", pinCntr=" + PageHeader.pinCount(absPtr) + ']'; setDirty(fullId, absPtr, true, true); @@ -676,21 +638,21 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public boolean freePage(int grpId, long pageId) throws IgniteCheckedException { + @Override public boolean freePage(int grpId, long pageId) { assert false : "Free page should be never called directly when persistence is enabled."; return false; } /** {@inheritDoc} */ - @Override public long metaPageId(int grpId) throws IgniteCheckedException { + @Override public long metaPageId(int grpId) { assert started; return storeMgr.metaPageId(grpId); } /** {@inheritDoc} */ - @Override public long partitionMetaPageId(int grpId, int partId) throws IgniteCheckedException { + @Override public long partitionMetaPageId(int grpId, int partId) { assert started; return PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, 0); @@ -1103,9 +1065,8 @@ public class PageMemoryImpl implements PageMemoryEx { double res = 0; - for (Segment segment : segments) { + for (Segment segment : segments) res = Math.max(res, segment.getDirtyPagesRatio()); - } return res; } @@ -1119,9 +1080,8 @@ public class PageMemoryImpl implements PageMemoryEx { long res = 0; - for (Segment segment : segments) { + for (Segment segment : segments) res += segment.pages(); - } return res; } @@ -1630,7 +1590,7 @@ public class PageMemoryImpl implements PageMemoryEx { // Create a buffer copy if the page is scheduled for a checkpoint. if (isInCheckpoint(fullId) && PageHeader.tempBufferPointer(absPtr) == INVALID_REL_PTR) { - long tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(fullId.pageId()); + long tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(PageIdUtils.tag(fullId.pageId())); if (tmpRelPtr == INVALID_REL_PTR) { rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS); @@ -1669,10 +1629,10 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param page Page pointer. * @param fullId full page ID. - * @param walPlc - * @param walPlc Full page WAL record policy. - * @param markDirty set dirty flag to page. - * @param restore + * @param walPlc WAL policy + * @param walPlc Full page WAL record policy + * @param markDirty set dirty flag to page + * @param restore restore flag */ private void writeUnlockPage( long page, @@ -1807,7 +1767,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** {@inheritDoc} */ @Override public int checkpointBufferPagesCount() { - return cpBufPagesCntr.get(); + return checkpointPool.size(); } /** @@ -1888,189 +1848,6 @@ public class PageMemoryImpl implements PageMemoryEx { } /** - * - */ - private class PagePool { - /** Segment index. */ - protected final int idx; - - /** Direct memory region. */ - protected final DirectMemoryRegion region; - - /** Pool pages counter. */ - protected final AtomicInteger pagesCntr; - - /** */ - protected long lastAllocatedIdxPtr; - - /** Pointer to the address of the free page list. */ - protected long freePageListPtr; - - /** Pages base. */ - protected long pagesBase; - - /** - * @param idx Index. - * @param region Region - * @param pagesCntr Pages counter. - */ - protected PagePool(int idx, DirectMemoryRegion region, AtomicInteger pagesCntr) { - this.idx = idx; - this.region = region; - this.pagesCntr = pagesCntr; - - long base = (region.address() + 7) & ~0x7; - - freePageListPtr = base; - - base += 8; - - lastAllocatedIdxPtr = base; - - base += 8; - - // Align page start by - pagesBase = base; - - GridUnsafe.putLong(freePageListPtr, INVALID_REL_PTR); - GridUnsafe.putLong(lastAllocatedIdxPtr, 1L); - } - - /** - * Allocates a new free page. - * - * @param pageId Page ID to to initialize. - * @return Relative pointer to the allocated page. - * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page. - */ - private long borrowOrAllocateFreePage(long pageId) throws GridOffHeapOutOfMemoryException { - if (pagesCntr != null) - pagesCntr.getAndIncrement(); - - long relPtr = borrowFreePage(); - - return relPtr != INVALID_REL_PTR ? relPtr : allocateFreePage(pageId); - } - - /** - * @return Relative pointer to a free page that was borrowed from the allocated pool. - */ - private long borrowFreePage() { - while (true) { - long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr); - - long freePageRelPtr = freePageRelPtrMasked & ADDRESS_MASK; - - if (freePageRelPtr != INVALID_REL_PTR) { - long freePageAbsPtr = absolute(freePageRelPtr); - - long nextFreePageRelPtr = GridUnsafe.getLong(freePageAbsPtr) & ADDRESS_MASK; - - long cnt = ((freePageRelPtrMasked & COUNTER_MASK) + COUNTER_INC) & COUNTER_MASK; - - if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, nextFreePageRelPtr | cnt)) { - GridUnsafe.putLong(freePageAbsPtr, PAGE_MARKER); - - return freePageRelPtr; - } - } - else - return INVALID_REL_PTR; - } - } - - /** - * @param pageId Page ID. - * @return Relative pointer of the allocated page. - * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page. - */ - private long allocateFreePage(long pageId) throws GridOffHeapOutOfMemoryException { - long limit = region.address() + region.size(); - - while (true) { - long lastIdx = GridUnsafe.getLong(lastAllocatedIdxPtr); - - // Check if we have enough space to allocate a page. - if (pagesBase + (lastIdx + 1) * sysPageSize > limit) - return INVALID_REL_PTR; - - if (GridUnsafe.compareAndSwapLong(null, lastAllocatedIdxPtr, lastIdx, lastIdx + 1)) { - long absPtr = pagesBase + lastIdx * sysPageSize; - - assert (lastIdx & SEGMENT_INDEX_MASK) == 0L; - - long relative = relative(lastIdx); - - assert relative != INVALID_REL_PTR; - - PageHeader.initNew(absPtr, relative); - - rwLock.init(absPtr + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); - - return relative; - } - } - } - - /** - * @param relPtr Relative pointer to free. - * @return Resulting number of pages in pool if pages counter is enabled, 0 otherwise. - */ - private int releaseFreePage(long relPtr) { - long absPtr = absolute(relPtr); - - assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr); - - int resCntr = 0; - - if (pagesCntr != null) - resCntr = pagesCntr.decrementAndGet(); - - while (true) { - long freePageRelPtrMasked = GridUnsafe.getLong(freePageListPtr); - - long freePageRelPtr = freePageRelPtrMasked & RELATIVE_PTR_MASK; - - GridUnsafe.putLong(absPtr, freePageRelPtr); - - if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, relPtr)) - return resCntr; - } - } - - /** - * @param relativePtr Relative pointer. - * @return Absolute pointer. - */ - long absolute(long relativePtr) { - int segIdx = (int)((relativePtr >> 40) & 0xFFFF); - - assert segIdx == idx : "expected=" + idx + ", actual=" + segIdx; - - long pageIdx = relativePtr & ~SEGMENT_INDEX_MASK; - - long off = pageIdx * sysPageSize; - - return pagesBase + off; - } - - /** - * @param pageIdx Page index in the pool. - * @return Relative pointer. - */ - private long relative(long pageIdx) { - return pageIdx | ((long)idx) << 40; - } - - /** - * @return Max number of pages in the pool. - */ - private int pages() { - return (int)((region.size() - (pagesBase - region.address())) / sysPageSize); - } - } - - /** * Gets a collection of all pages currently marked as dirty. Will create a collection copy. * * @return Collection of all page IDs marked as dirty. @@ -2160,7 +1937,7 @@ public class PageMemoryImpl implements PageMemoryEx { DirectMemoryRegion poolRegion = region.slice(memPerTbl + ldPagesMapOffInRegion); - pool = new PagePool(idx, poolRegion, null); + pool = new PagePool(idx, poolRegion, sysPageSize, rwLock); maxDirtyPages = throttlingPlc != ThrottlingPolicy.DISABLED ? pool.pages() * 3 / 4 @@ -2246,7 +2023,7 @@ public class PageMemoryImpl implements PageMemoryEx { * @return Page relative pointer. */ private long borrowOrAllocateFreePage(long pageId) { - return pool.borrowOrAllocateFreePage(pageId); + return pool.borrowOrAllocateFreePage(PageIdUtils.tag(pageId)); } /** @@ -2702,230 +2479,6 @@ public class PageMemoryImpl implements PageMemoryEx { /** * */ - private static class PageHeader { - /** - * @param absPtr Absolute pointer to initialize. - * @param relative Relative pointer to write. - */ - private static void initNew(long absPtr, long relative) { - relative(absPtr, relative); - - tempBufferPointer(absPtr, INVALID_REL_PTR); - - GridUnsafe.putLong(absPtr, PAGE_MARKER); - GridUnsafe.putInt(absPtr + PAGE_PIN_CNT_OFFSET, 0); - } - - /** - * @param absPtr Absolute pointer. - * @return Dirty flag. - */ - private static boolean dirty(long absPtr) { - return flag(absPtr, DIRTY_FLAG); - } - - /** - * @param absPtr Page absolute pointer. - * @param dirty Dirty flag. - * @return Previous value of dirty flag. - */ - private static boolean dirty(long absPtr, boolean dirty) { - return flag(absPtr, DIRTY_FLAG, dirty); - } - - /** - * @param absPtr Absolute pointer. - * @param flag Flag mask. - * @return Flag value. - */ - private static boolean flag(long absPtr, long flag) { - assert (flag & 0xFFFFFFFFFFFFFFL) == 0; - assert Long.bitCount(flag) == 1; - - long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET); - - return (relPtrWithFlags & flag) != 0; - } - - /** - * Sets flag. - * - * @param absPtr Absolute pointer. - * @param flag Flag mask. - * @param set New flag value. - * @return Previous flag value. - */ - private static boolean flag(long absPtr, long flag, boolean set) { - assert (flag & 0xFFFFFFFFFFFFFFL) == 0; - assert Long.bitCount(flag) == 1; - - long relPtrWithFlags = GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET); - - boolean was = (relPtrWithFlags & flag) != 0; - - if (set) - relPtrWithFlags |= flag; - else - relPtrWithFlags &= ~flag; - - GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtrWithFlags); - - return was; - } - - /** - * @param absPtr Page pointer. - * @return If page is pinned. - */ - private static boolean isAcquired(long absPtr) { - return GridUnsafe.getInt(absPtr + PAGE_PIN_CNT_OFFSET) > 0; - } - - /** - * @param absPtr Absolute pointer. - */ - private static void acquirePage(long absPtr) { - updateAtomicInt(absPtr + PAGE_PIN_CNT_OFFSET, 1); - } - - /** - * @param absPtr Absolute pointer. - */ - private static int releasePage(long absPtr) { - return updateAtomicInt(absPtr + PAGE_PIN_CNT_OFFSET, -1); - } - - /** - * Reads relative pointer from the page at the given absolute position. - * - * @param absPtr Absolute memory pointer to the page header. - * @return Relative pointer written to the page. - */ - private static long readRelative(long absPtr) { - return GridUnsafe.getLong(absPtr + RELATIVE_PTR_OFFSET) & RELATIVE_PTR_MASK; - } - - /** - * Writes relative pointer to the page at the given absolute position. - * - * @param absPtr Absolute memory pointer to the page header. - * @param relPtr Relative pointer to write. - */ - private static void relative(long absPtr, long relPtr) { - GridUnsafe.putLong(absPtr + RELATIVE_PTR_OFFSET, relPtr & RELATIVE_PTR_MASK); - } - - /** - * Volatile write for current timestamp to page in {@code absAddr} address. - * - * @param absPtr Absolute page address. - */ - private static void writeTimestamp(final long absPtr, long tstamp) { - tstamp >>= 8; - - GridUnsafe.putLongVolatile(null, absPtr, (tstamp << 8) | 0x01); - } - - /** - * Read for timestamp from page in {@code absAddr} address. - * - * @param absPtr Absolute page address. - * @return Timestamp. - */ - private static long readTimestamp(final long absPtr) { - long markerAndTs = GridUnsafe.getLong(absPtr); - - // Clear last byte as it is occupied by page marker. - return markerAndTs & ~0xFF; - } - - /** - * Sets pointer to checkpoint buffer. - * - * @param absPtr Page absolute pointer. - * @param tmpRelPtr Temp buffer relative pointer or {@link #INVALID_REL_PTR} if page is not copied to checkpoint - * buffer. - */ - private static void tempBufferPointer(long absPtr, long tmpRelPtr) { - GridUnsafe.putLong(absPtr + PAGE_TMP_BUF_OFFSET, tmpRelPtr); - } - - /** - * Gets pointer to checkpoint buffer or {@link #INVALID_REL_PTR} if page is not copied to checkpoint buffer. - * - * @param absPtr Page absolute pointer. - * @return Temp buffer relative pointer. - */ - private static long tempBufferPointer(long absPtr) { - return GridUnsafe.getLong(absPtr + PAGE_TMP_BUF_OFFSET); - } - - /** - * Reads page ID from the page at the given absolute position. - * - * @param absPtr Absolute memory pointer to the page header. - * @return Page ID written to the page. - */ - private static long readPageId(long absPtr) { - return GridUnsafe.getLong(absPtr + PAGE_ID_OFFSET); - } - - /** - * Writes page ID to the page at the given absolute position. - * - * @param absPtr Absolute memory pointer to the page header. - * @param pageId Page ID to write. - */ - private static void pageId(long absPtr, long pageId) { - GridUnsafe.putLong(absPtr + PAGE_ID_OFFSET, pageId); - } - - /** - * Reads cache group ID from the page at the given absolute pointer. - * - * @param absPtr Absolute memory pointer to the page header. - * @return Cache group ID written to the page. - */ - private static int readPageGroupId(final long absPtr) { - return GridUnsafe.getInt(absPtr + PAGE_CACHE_ID_OFFSET); - } - - /** - * Writes cache group ID from the page at the given absolute pointer. - * - * @param absPtr Absolute memory pointer to the page header. - * @param grpId Cache group ID to write. - */ - private static void pageGroupId(final long absPtr, final int grpId) { - GridUnsafe.putInt(absPtr + PAGE_CACHE_ID_OFFSET, grpId); - } - - /** - * Reads page ID and cache group ID from the page at the given absolute pointer. - * - * @param absPtr Absolute memory pointer to the page header. - * @return Full page ID written to the page. - */ - private static FullPageId fullPageId(final long absPtr) { - return new FullPageId(readPageId(absPtr), readPageGroupId(absPtr)); - } - - /** - * Writes page ID and cache group ID from the page at the given absolute pointer. - * - * @param absPtr Absolute memory pointer to the page header. - * @param fullPageId Full page ID to write. - */ - private static void fullPageId(final long absPtr, final FullPageId fullPageId) { - pageId(absPtr, fullPageId.pageId()); - - pageGroupId(absPtr, fullPageId.groupId()); - } - } - - /** - * - */ private static class ClearSegmentRunnable implements Runnable { /** */ private Segment seg; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java new file mode 100644 index 0000000..d267b36 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePool.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.mem.DirectMemoryRegion; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.OffheapReadWriteLock; +import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +public class PagePool { + /** Relative pointer chunk index mask. */ + private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L; + + /** Address mask to avoid ABA problem. */ + private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL; + + /** Counter increment to avoid ABA problem. */ + private static final long COUNTER_INC = ADDRESS_MASK + 1; + + /** Counter mask to avoid ABA problem. */ + private static final long COUNTER_MASK = ~ADDRESS_MASK; + + /** Segment index. */ + protected final int idx; + + /** Direct memory region. */ + protected final DirectMemoryRegion region; + + /** Pool pages counter. */ + protected final AtomicInteger pagesCntr = new AtomicInteger(); + + /** */ + protected long lastAllocatedIdxPtr; + + /** Pointer to the address of the free page list. */ + protected long freePageListPtr; + + /** Pages base. */ + protected long pagesBase; + + /** System page size. */ + private final int sysPageSize; + + /** Instance of RW Lock Updater */ + private OffheapReadWriteLock rwLock; + + /** + * @param idx Index. + * @param region Region + */ + protected PagePool( + int idx, + DirectMemoryRegion region, + int sysPageSize, + OffheapReadWriteLock rwLock + ) { + this.idx = idx; + this.region = region; + this.sysPageSize = sysPageSize; + this.rwLock = rwLock; + + long base = (region.address() + 7) & ~0x7; + + freePageListPtr = base; + + base += 8; + + lastAllocatedIdxPtr = base; + + base += 8; + + // Align page start by + pagesBase = base; + + GridUnsafe.putLong(freePageListPtr, PageMemoryImpl.INVALID_REL_PTR); + GridUnsafe.putLong(lastAllocatedIdxPtr, 0L); + } + + /** + * Allocates a new free page. + * + * @param tag Tag to initialize page RW lock. + * @return Relative pointer to the allocated page. + * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page. + */ + public long borrowOrAllocateFreePage(int tag) throws GridOffHeapOutOfMemoryException { + long relPtr = borrowFreePage(); + + if (relPtr == PageMemoryImpl.INVALID_REL_PTR) + relPtr = allocateFreePage(tag); + + if (relPtr != PageMemoryImpl.INVALID_REL_PTR && pagesCntr != null) + pagesCntr.incrementAndGet(); + + return relPtr; + } + + /** + * @return Relative pointer to a free page that was borrowed from the allocated pool. + */ + private long borrowFreePage() { + while (true) { + long freePageRelPtrMasked = GridUnsafe.getLong(null, freePageListPtr); + + long freePageRelPtr = freePageRelPtrMasked & ADDRESS_MASK; + + if (freePageRelPtr != PageMemoryImpl.INVALID_REL_PTR) { + long freePageAbsPtr = absolute(freePageRelPtr); + + long nextFreePageRelPtr = GridUnsafe.getLong(null, freePageAbsPtr) & ADDRESS_MASK; + + // nextFreePageRelPtr may be invalid because a concurrent thread may have already polled this value + // and used it. + long cnt = ((freePageRelPtrMasked & COUNTER_MASK) + COUNTER_INC) & COUNTER_MASK; + + if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, nextFreePageRelPtr | cnt)) { + GridUnsafe.putLongVolatile(null, freePageAbsPtr, PageHeader.PAGE_MARKER); + + return freePageRelPtr; + } + } + else + return PageMemoryImpl.INVALID_REL_PTR; + } + } + + /** + * @param tag Tag to initialize page RW lock. + * @return Relative pointer of the allocated page. + * @throws GridOffHeapOutOfMemoryException If failed to allocate new free page. + */ + private long allocateFreePage(int tag) throws GridOffHeapOutOfMemoryException { + long limit = region.address() + region.size(); + + while (true) { + long lastIdx = GridUnsafe.getLong(null, lastAllocatedIdxPtr); + + // Check if we have enough space to allocate a page. + if (pagesBase + (lastIdx + 1) * sysPageSize > limit) + return PageMemoryImpl.INVALID_REL_PTR; + + if (GridUnsafe.compareAndSwapLong(null, lastAllocatedIdxPtr, lastIdx, lastIdx + 1)) { + long absPtr = pagesBase + lastIdx * sysPageSize; + + assert (lastIdx & SEGMENT_INDEX_MASK) == 0L; + + long relative = relative(lastIdx); + + assert relative != PageMemoryImpl.INVALID_REL_PTR; + + PageHeader.initNew(absPtr, relative); + + rwLock.init(absPtr + PageMemoryImpl.PAGE_LOCK_OFFSET, tag); + + return relative; + } + } + } + + /** + * @param relPtr Relative pointer to free. + * @return Resulting number of pages in pool if pages counter is enabled, 0 otherwise. + */ + public int releaseFreePage(long relPtr) { + long absPtr = absolute(relPtr); + + assert !PageHeader.isAcquired(absPtr) : "Release pinned page: " + PageHeader.fullPageId(absPtr); + + int resCntr = 0; + + if (pagesCntr != null) + resCntr = pagesCntr.decrementAndGet(); + + while (true) { + long freePageRelPtrMasked = GridUnsafe.getLong(null, freePageListPtr); + + long freePageRelPtr = freePageRelPtrMasked & PageMemoryImpl.RELATIVE_PTR_MASK; + + GridUnsafe.putLong(null, absPtr, freePageRelPtr); + + if (GridUnsafe.compareAndSwapLong(null, freePageListPtr, freePageRelPtrMasked, relPtr)) + return resCntr; + } + } + + /** + * @param relativePtr Relative pointer. + * @return Absolute pointer. + */ + long absolute(long relativePtr) { + int segIdx = (int)((relativePtr >> 40) & 0xFFFF); + + assert segIdx == idx : "expected=" + idx + ", actual=" + segIdx + ", relativePtr=" + U.hexLong(relativePtr); + + long pageIdx = relativePtr & ~SEGMENT_INDEX_MASK; + + long off = pageIdx * sysPageSize; + + return pagesBase + off; + } + + /** + * @param pageIdx Page index in the pool. + * @return Relative pointer. + */ + private long relative(long pageIdx) { + return pageIdx | ((long)idx) << 40; + } + + /** + * @return Max number of pages in the pool. + */ + public int pages() { + return (int)((region.size() - (pagesBase - region.address())) / sysPageSize); + } + + /** + * @return Number of pages in the list. + */ + public int size() { + return pagesCntr.get(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java index 573a602..2197c7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java @@ -1362,6 +1362,26 @@ public abstract class GridUnsafe { } /** + * Atomically increments value stored in an integer pointed by {@code ptr}. + * + * @param ptr Pointer to an integer. + * @return Updated value. + */ + public static int incrementAndGetInt(long ptr) { + return UNSAFE.getAndAddInt(null, ptr, 1) + 1; + } + + /** + * Atomically increments value stored in an integer pointed by {@code ptr}. + * + * @param ptr Pointer to an integer. + * @return Updated value. + */ + public static int decrementAndGetInt(long ptr) { + return UNSAFE.getAndAddInt(null, ptr, -1) - 1; + } + + /** * Gets byte value with volatile semantic. * * @param obj Object. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f6d9150..34ef7be 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -8721,6 +8721,21 @@ public abstract class IgniteUtils { } /** + * Round up the argument to the next highest power of 2; + * + * @param v Value to round up. + * @return Next closest power of 2. + */ + public static int nextPowerOf2(int v) { + A.ensure(v >= 0, "v must not be negative"); + + if (v == 0) + return 1; + + return 1 << (32 - Integer.numberOfLeadingZeros(v - 1)); + } + + /** * Gets absolute value for integer. If integer is {@link Integer#MIN_VALUE}, then {@code 0} is returned. * * @param i Integer. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePoolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePoolTest.java new file mode 100644 index 0000000..7d17d2a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagePoolTest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.internal.mem.DirectMemoryProvider; +import org.apache.ignite.internal.mem.DirectMemoryRegion; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.util.OffheapReadWriteLock; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * + */ +@RunWith(Parameterized.class) +public class PagePoolTest extends GridCommonAbstractTest { + /** + * @return Test parameters. + */ + @Parameterized.Parameters(name = "PageSize={0}, segment={1}") + public static Collection<Object[]> parameters() { + return Arrays.asList( + new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 0}, + new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 1}, + new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 2}, + new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 4}, + new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 8}, + new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 16}, + new Object[] {1024 + PageMemoryImpl.PAGE_OVERHEAD, 31}, + + new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 0}, + new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 1}, + new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 2}, + new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 4}, + new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 8}, + new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 16}, + new Object[] {2048 + PageMemoryImpl.PAGE_OVERHEAD, 31}, + + new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 0}, + new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 1}, + new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 2}, + new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 4}, + new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 8}, + new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 16}, + new Object[] {4096 + PageMemoryImpl.PAGE_OVERHEAD, 31}, + + new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 0}, + new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 1}, + new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 2}, + new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 4}, + new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 8}, + new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 16}, + new Object[] {8192 + PageMemoryImpl.PAGE_OVERHEAD, 31}, + + new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 0}, + new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 1}, + new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 2}, + new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 4}, + new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 8}, + new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 16}, + new Object[] {16384 + PageMemoryImpl.PAGE_OVERHEAD, 31} + ); + } + + /** */ + @Parameterized.Parameter + public int sysPageSize; + + /** */ + @Parameterized.Parameter(1) + public int segment; + + /** */ + private static final int PAGES = 100; + + /** */ + private OffheapReadWriteLock rwLock = new OffheapReadWriteLock(U.nextPowerOf2(Runtime.getRuntime().availableProcessors())); + + /** */ + private DirectMemoryProvider provider; + + /** */ + private DirectMemoryRegion region; + + /** */ + private PagePool pool; + + /** + */ + @Before + public void prepare() { + provider = new UnsafeMemoryProvider(log); + provider.initialize(new long[] {sysPageSize * PAGES + 16}); + + region = provider.nextRegion(); + + pool = new PagePool(segment, region, sysPageSize, rwLock); + } + + /** + */ + @After + public void cleanup() { + provider.shutdown(true); + } + + /** + */ + @Test + public void testSingleThreadedBorrowRelease() { + assertEquals(PAGES, pool.pages()); + assertEquals(0, pool.size()); + + Set<Long> allocated = new LinkedHashSet<>(); + LinkedList<Long> allocatedQueue = new LinkedList<>(); + + info("Region start: " + U.hexLong(region.address())); + + int tag = 1; + + for (int i = 0; i < PAGES; i++) { + long relPtr = pool.borrowOrAllocateFreePage(tag); + + assertTrue("Failed for i=" + i, relPtr != PageMemoryImpl.INVALID_REL_PTR); + + assertTrue(allocated.add(relPtr)); + allocatedQueue.add(relPtr); + + PageHeader.writeTimestamp(pool.absolute(relPtr), 0xFFFFFFFFFFFFFFFFL); + + assertEquals(i + 1, pool.size()); + } + + info("Done allocating"); + + assertEquals(PageMemoryImpl.INVALID_REL_PTR, pool.borrowOrAllocateFreePage(tag)); + + assertEquals(PAGES, pool.size()); + + { + int i = 0; + + for (Long relPtr : allocated) { + pool.releaseFreePage(relPtr); + + i++; + + assertEquals(PAGES - i, pool.size()); + } + } + + info("Done releasing"); + + assertEquals(0, pool.size()); + + { + Iterator<Long> it = allocatedQueue.descendingIterator(); + + int i = 0; + + while (it.hasNext()) { + long relPtr = it.next(); + + long fromPool = pool.borrowOrAllocateFreePage(tag); + + assertEquals(relPtr, fromPool); + + i++; + + assertEquals(i, pool.size()); + + PageHeader.writeTimestamp(pool.absolute(relPtr), 0xFFFFFFFFFFFFFFFFL); + } + } + } + + /** + * @throws Exception if failed. + */ + @Test + public void testMultithreadedConsistency() throws Exception { + assertEquals(PAGES, pool.pages()); + assertEquals(0, pool.size()); + + ConcurrentMap<Long, Long> allocated = new ConcurrentHashMap<>(); + + { + long relPtr; + + while ((relPtr = pool.borrowOrAllocateFreePage(1)) != PageMemoryImpl.INVALID_REL_PTR) { + assertNull(allocated.put(relPtr, relPtr)); + + PageHeader.writeTimestamp(pool.absolute(relPtr), 0xFFFFFFFFFFFFFFFFL); + } + } + + assertEquals(PAGES, pool.size()); + assertEquals(PAGES, allocated.size()); + + GridTestUtils.runMultiThreaded(() -> { + while (!allocated.isEmpty()) { + Long polled = pollRandom(allocated); + + if (polled != null) + pool.releaseFreePage(polled); + } + }, Runtime.getRuntime().availableProcessors(), "load-runner"); + + assertTrue(allocated.isEmpty()); + assertEquals(0, pool.size()); + + GridTestUtils.runMultiThreaded(() -> { + long polled; + + while ((polled = pool.borrowOrAllocateFreePage(1)) != PageMemoryImpl.INVALID_REL_PTR) { + assertNull(allocated.put(polled, polled)); + + PageHeader.writeTimestamp(pool.absolute(polled), 0xFFFFFFFFFFFFFFFFL); + } + }, Runtime.getRuntime().availableProcessors(), "load-runner"); + + assertEquals(PAGES, pool.size()); + assertEquals(PAGES, allocated.size()); + + GridTestUtils.runMultiThreaded(() -> { + boolean toPool = true; + + for (int i = 0; i < 10_000; i++) { + if (toPool) { + if (allocated.size() < PAGES / 3) { + toPool = false; + + log.info("Direction switched: " + toPool); + } + } + else { + if (allocated.size() > 2 * PAGES / 3) { + toPool = true; + + log.info("Direction switched: " + toPool); + } + } + + boolean inverse = ThreadLocalRandom.current().nextInt(3) == 0; + + if (toPool ^ inverse) { + Long polled = pollRandom(allocated); + + if (polled != null) + pool.releaseFreePage(polled); + } + else { + long polled = pool.borrowOrAllocateFreePage(1); + + if (polled != PageMemoryImpl.INVALID_REL_PTR) { + long abs = pool.absolute(polled); + + PageHeader.writeTimestamp(abs, 0xFFFFFFFFFFFFFFFFL); + + assertNull(allocated.put(polled, polled)); + } + } + } + }, Runtime.getRuntime().availableProcessors(), "load-runner"); + + { + long relPtr; + + while ((relPtr = pool.borrowOrAllocateFreePage(1)) != PageMemoryImpl.INVALID_REL_PTR) + assertNull(allocated.put(relPtr, relPtr)); + + assertEquals(PAGES, allocated.size()); + assertEquals(PAGES, pool.size()); + } + } + + /** + * @param allocated Map of allocated pages. + * @return Random page polled from the map. + */ + private Long pollRandom(ConcurrentMap<Long, Long> allocated) { + int size = allocated.size(); + + if (size == 0) + return null; + + int cnt = ThreadLocalRandom.current().nextInt(size); + + Iterator<Long> it = allocated.keySet().iterator(); + + for (int i = 0; i < cnt; i++) { + if (it.hasNext()) + it.next(); + else + break; + } + + if (it.hasNext()) { + Long key = it.next(); + + if (allocated.remove(key) != null) + return key; + } + + return null; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java index 7784602..7168e0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java @@ -128,6 +128,25 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { } /** + * + */ + @Test + public void testNextPowOf2() { + assertEquals(1, U.nextPowerOf2(0)); + assertEquals(1, U.nextPowerOf2(1)); + assertEquals(2, U.nextPowerOf2(2)); + assertEquals(4, U.nextPowerOf2(3)); + assertEquals(4, U.nextPowerOf2(4)); + + assertEquals(8, U.nextPowerOf2(5)); + assertEquals(8, U.nextPowerOf2(6)); + assertEquals(8, U.nextPowerOf2(7)); + assertEquals(8, U.nextPowerOf2(8)); + + assertEquals(32768, U.nextPowerOf2(32767)); + } + + /** * @throws Exception If failed. */ @Test diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 658172e..927dd71 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExc import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.DropCacheContextDuringEvictionTest; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictionTaskFailureHandlerTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagePoolTest; import org.apache.ignite.internal.processors.cache.query.continuous.DiscoveryDataDeserializationFailureHanderTest; import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest; import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithSystemWorkerDeathTest; @@ -206,6 +207,7 @@ import org.junit.runners.Suite; GridPeerDeploymentRetryModifiedTest.class, // Basic DB data structures. + PagePoolTest.class, BPlusTreeSelfTest.class, BPlusTreeFakeReuseSelfTest.class, BPlusTreeReuseSelfTest.class,
