GG-11044 - Implement page evictions from PageMemory for the case when data size is bigger than allocated memory.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2fedf6b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2fedf6b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2fedf6b Branch: refs/heads/ignite-db-x-10884 Commit: d2fedf6bb95bdd5f30a800959f9c9ff93d200bc7 Parents: 4282d80 Author: dkarachentsev <[email protected]> Authored: Thu Apr 21 19:15:04 2016 +0300 Committer: dkarachentsev <[email protected]> Committed: Thu Apr 21 19:15:04 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/pagemem/PageMemory.java | 3 +- .../internal/pagemem/impl/FullPageIdTable.java | 90 +++++++- .../ignite/internal/pagemem/impl/PageImpl.java | 9 + .../internal/pagemem/impl/PageMemoryImpl.java | 206 +++++++++++++++++-- 4 files changed, 287 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java index 0f10002..cf5ff69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java @@ -73,7 +73,8 @@ public interface PageMemory extends LifecycleAware, PageIdAllocator { * @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by * the {@link #beginCheckpoint()} method call. * @param tmpBuf Temporary buffer to write changes into. + * @return {@code True} if data were read, {@code false} otherwise (data already saved to storage). * @throws IgniteException If failed to obtain page data. */ - public void getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf); + public boolean getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java index e05d64c..c6529f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/FullPageIdTable.java @@ -78,6 +78,24 @@ public class FullPageIdTable { /** */ protected DirectMemoryUtils mem; + /** Addressing strategy. */ + private final AddressingStrategy strategy; + + /** Specifies types of addressing. */ + public enum AddressingStrategy { + /** + * Insertion will search for each available cell. + * Slower, but more suitable when used many removes/insertions. + */ + LINEAR, + + /** + * Insertion will search for available cell with limited steps. + * Faster, but requires more memory to resolve collisions. + */ + QUADRATIC + } + /** * @return Estimated memory size required for this map to store the given number of elements. */ @@ -93,11 +111,20 @@ public class FullPageIdTable { * @param len Allocated memory length. * @param clear If {@code true}, then memory is considered dirty and will be cleared. Otherwise, * map will assume that the given memory region is in valid state. + * @param stgy Addressing strategy {@link AddressingStrategy}. */ - public FullPageIdTable(DirectMemoryUtils mem, long addr, long len, boolean clear) { + public FullPageIdTable(DirectMemoryUtils mem, long addr, long len, boolean clear, AddressingStrategy stgy) { valPtr = addr; + this.strategy = stgy; capacity = (int)((len - 4) / BYTES_PER_ENTRY); - maxSteps = (int)Math.sqrt(capacity); + + if (stgy == AddressingStrategy.LINEAR) + maxSteps = capacity; + else if (stgy == AddressingStrategy.QUADRATIC) + maxSteps = (int) Math.sqrt(capacity); + else + throw new IllegalArgumentException("Unsupported addressing strategy: " + stgy); + this.mem = mem; if (clear) @@ -167,6 +194,24 @@ public class FullPageIdTable { } /** + * Find nearest value from specified position to the right. + * + * @param idx Index to start searching from. + * @param absent Default value that will be returned if no values present. + * @return Closest value to the index or {@code absent} if no values found. + */ + public long getNearestAt(final int idx, final long absent) { + for (int i = idx; i < capacity + idx; i++) { + final int idx2 = i >= capacity ? i - capacity : i; + + if (isValuePresentAt(idx2)) + return valueAt(idx2); + } + + return absent; + } + + /** * @param key Key. * @return Key index. */ @@ -178,7 +223,7 @@ public class FullPageIdTable { do { int res = testKeyAt(index, key); - if (res == EMPTY) { + if (res == EMPTY || res == REMOVED) { setKeyAt(index, key); incrementSize(); @@ -188,9 +233,14 @@ public class FullPageIdTable { else if (res == EQUAL) return index; else - assert res == REMOVED || res == NOT_EQUAL; + assert res == NOT_EQUAL; + + if (strategy == AddressingStrategy.QUADRATIC) + index += step; + else if (strategy == AddressingStrategy.LINEAR) + index++; - if ((index += step) >= capacity) + if (index >= capacity) index -= capacity; } while (++step <= maxSteps); @@ -217,7 +267,12 @@ public class FullPageIdTable { else assert res == REMOVED || res == NOT_EQUAL; - if ((index += step) >= capacity) + if (strategy == AddressingStrategy.QUADRATIC) + index += step; + else if (strategy == AddressingStrategy.LINEAR) + index++; + + if (index >= capacity) index -= capacity; } while (++step <= maxSteps); @@ -248,7 +303,12 @@ public class FullPageIdTable { else assert res == REMOVED || res == NOT_EQUAL; - if ((index += step) >= capacity) + if (strategy == AddressingStrategy.QUADRATIC) + index += step; + else if (strategy == AddressingStrategy.LINEAR) + index++; + + if (index >= capacity) index -= capacity; } while (++step <= maxSteps); @@ -278,6 +338,20 @@ public class FullPageIdTable { } /** + * @param idx Index to test. + * @return {@code True} if value set for index. + */ + private boolean isValuePresentAt(final int idx) { + long base = valPtr + 4 + (long)idx * BYTES_PER_ENTRY; + + long pageId = mem.readLong(base); + int cacheId = mem.readInt(base + 8); + + return !((pageId == REMOVED_PAGE_ID && cacheId == REMOVED_CACHE_ID) + || (pageId == EMPTY_PAGE_ID && cacheId == EMPTY_CACHE_ID)); + } + + /** * @param fullId Full page ID to check. * @return {@code True} if checks succeeded. */ @@ -333,6 +407,6 @@ public class FullPageIdTable { * */ private void decrementSize() { - mem.writeInt(valPtr, mem.readInt(valPtr) + 1); + mem.writeInt(valPtr, mem.readInt(valPtr) - 1); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java index 77f096d..6e33572 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java @@ -141,6 +141,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { @Override public ByteBuffer getForRead() { acquireShared(1); + pageMem.atomicWriteCurrentTimestamp(ptr); + return reset(buf.asReadOnlyBuffer()); } @@ -156,6 +158,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { markDirty(); + pageMem.atomicWriteCurrentTimestamp(ptr); + return reset(buf); } @@ -167,6 +171,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { acquire(1); setExclusiveOwnerThread(th); + + pageMem.atomicWriteCurrentTimestamp(ptr); } return reset(buf); @@ -211,6 +217,7 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { return pageMem.isDirty(ptr); } + /** {@inheritDoc} */ @Override public String toString() { SB sb = new SB("PageImpl [handle="); @@ -230,6 +237,8 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { */ void acquireReference() { refCntUpd.incrementAndGet(this); + + pageMem.atomicWriteCurrentTimestamp(ptr); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d2fedf6b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java index b8d58f6..62a4e72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; @@ -79,8 +80,20 @@ public class PageMemoryImpl implements PageMemory { /** Page ID offset */ public static final int PAGE_ID_OFFSET = 16; - /** Need a 8-byte pointer for linked list and 8 bytes for internal needs. */ - public static final int PAGE_OVERHEAD = 24; + /** Page cache ID offset. */ + public static final int PAGE_CACHE_ID_OFFSET = 24; + + /** Page access timestamp */ + public static final int PAGE_TIMESTAMP_OFFSET = 28; + + /** + * Need a 8-byte pointer for linked list, 8 bytes for internal needs (flags), + * 4 bytes cache ID, 8 bytes timestamp. + */ + public static final int PAGE_OVERHEAD = 36; + + /** Number of random pages that will be picked for eviction. */ + public static final int RANDOM_PAGES_EVICT_NUM = 5; /** Page size. */ private int sysPageSize; @@ -210,9 +223,14 @@ public class PageMemoryImpl implements PageMemory { relPtr = allocateFreePage(); + if (relPtr == INVALID_REL_PTR) + throw new OutOfMemoryException(); + absPtr = absolute(relPtr); writePageId(absPtr, pageId); + writePageCacheId(absPtr, cacheId); + atomicWriteCurrentTimestamp(absPtr); } // TODO pass an argument to decide whether the page should be cleaned. @@ -309,8 +327,14 @@ public class PageMemoryImpl implements PageMemory { relPtr = borrowOrAllocateFreePage(); + if (relPtr == INVALID_REL_PTR) + relPtr = evictPage(seg); + long absPtr = absolute(relPtr); + writeFullPageId(absPtr, fullId); + atomicWriteCurrentTimestamp(absPtr); + // We can clear dirty flag after the page has been allocated. setDirty(fullId, absPtr, false); @@ -373,7 +397,7 @@ public class PageMemoryImpl implements PageMemory { } /** {@inheritDoc} */ - @Override public void getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf) { + @Override public boolean getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf) { assert tmpBuf.remaining() == pageSize(); Segment seg = segment(pageId); @@ -386,15 +410,16 @@ public class PageMemoryImpl implements PageMemory { page = seg.acquiredPages.get(pageId); if (page != null) { - assert page.isDirty() : "Page is acquired for a checkpoint, but is not dirty: " + page; + if (!page.isDirty()) + return false; page.acquireReference(); } else { long relPtr = seg.loadedPages.get(pageId, INVALID_REL_PTR); - assert relPtr != INVALID_REL_PTR : "Failed to get page checkpoint data (page has been evicted) " + - "[pageId=" + pageId + ']'; + if (relPtr == INVALID_REL_PTR) + return false; long absPtr = absolute(relPtr); @@ -404,7 +429,7 @@ public class PageMemoryImpl implements PageMemory { setDirty(pageId, absPtr, false); - return; + return true; } } finally { @@ -428,6 +453,8 @@ public class PageMemoryImpl implements PageMemory { finally { releasePage(page); } + + return true; } /** @@ -529,6 +556,47 @@ public class PageMemoryImpl implements PageMemory { } /** + * Reads cache ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @return Cache ID written to the page. + */ + int readPageCacheId(final long absPtr) { + return mem.readInt(absPtr + PAGE_CACHE_ID_OFFSET); + } + + /** + * Writes cache ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @param cacheId Cache ID to write. + */ + void writePageCacheId(final long absPtr, final int cacheId) { + mem.writeInt(absPtr + PAGE_CACHE_ID_OFFSET, cacheId); + } + + /** + * Reads page ID and cache ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @return Full page ID written to the page. + */ + FullPageId readFullPageId(final long absPtr) { + return new FullPageId(readPageId(absPtr), readPageCacheId(absPtr)); + } + + /** + * Writes page ID and cache ID from the page at the given absolute pointer. + * + * @param absPtr Absolute memory pointer to the page header. + * @param fullPageId Full page ID to write. + */ + void writeFullPageId(final long absPtr, final FullPageId fullPageId) { + writePageId(absPtr, fullPageId.pageId()); + writePageCacheId(absPtr, fullPageId.cacheId()); + } + + /** * @param absPtr Absolute pointer. * @return {@code True} if page is dirty. */ @@ -576,6 +644,23 @@ public class PageMemoryImpl implements PageMemory { dirtyPages.remove(pageId); } + void atomicWriteCurrentTimestamp(final long absPtr) { + while (true) { + final long readTs = readTimestamp(absPtr); + + if (mem.compareAndSwapLong(absPtr + PAGE_TIMESTAMP_OFFSET, readTs, U.currentTimeMillis())) + break; + } + } + + void writeTimestamp(final long absPtr, final long ts) { + mem.writeLong(absPtr + PAGE_TIMESTAMP_OFFSET, ts); + } + + long readTimestamp(final long absPtr) { + return mem.readLong(absPtr + PAGE_TIMESTAMP_OFFSET); + } + /** * Attempts to restore page memory state based on the memory chunks returned by the allocator. */ @@ -749,14 +834,14 @@ public class PageMemoryImpl implements PageMemory { /** * Requests next memory chunk from the system allocator. */ - private void requestNextChunk() { + private boolean requestNextChunk() { assert Thread.holdsLock(this); int curIdx = currentChunk.idx; // If current chunk is the last one, fail. if (curIdx == chunks.size() - 1) - throw new OutOfMemoryException(); + return false; Chunk chunk = chunks.get(curIdx + 1); @@ -765,6 +850,8 @@ public class PageMemoryImpl implements PageMemory { ", base=0x" + U.hexLong(chunk.fr.address()) + ", len=" + chunk.size() + ']'); currentChunk = chunk; + + return true; } /** @@ -834,8 +921,8 @@ public class PageMemoryImpl implements PageMemory { synchronized (this) { Chunk full = currentChunk; - if (chunk == full) - requestNextChunk(); + if (chunk == full && !requestNextChunk()) + return INVALID_REL_PTR; } } else @@ -844,6 +931,98 @@ public class PageMemoryImpl implements PageMemory { } /** + * Evict random oldest page from memory to storage. + * + * @param seg Currently locked segment. + * @return Relative addres for evicted page. + * @throws IgniteCheckedException + */ + private long evictPage(final Segment seg) throws IgniteCheckedException { + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + final long[] pageRelAddrs = new long[RANDOM_PAGES_EVICT_NUM]; + + final int cap = seg.loadedPages.capacity(); + + assert seg.loadedPages.size() >= RANDOM_PAGES_EVICT_NUM; + + if (seg.acquiredPages.size() >= seg.loadedPages.size()) + throw new OutOfMemoryException("No not acquired pages left for segment. Unable to evict."); + + while (true) { + for (int i = 0; i < RANDOM_PAGES_EVICT_NUM; i++) { + // We need to lookup for pages only in current segment for thread safety, + // so peeking random memory will lead to checking for found page segment. + // It's much faster to check available pages for segment right away. + final long addr = seg.loadedPages.getNearestAt(rnd.nextInt(cap), INVALID_REL_PTR); + + assert addr != INVALID_REL_PTR; + + pageRelAddrs[i] = addr; + } + + final long relEvictAddr = findSuitablePageRelAddr(pageRelAddrs); + + assert relEvictAddr != INVALID_REL_PTR; + + final long absEvictAddr = absolute(relEvictAddr); + + assert absEvictAddr != dbMetaPageIdPtr; + + final FullPageId fullPageId = readFullPageId(absEvictAddr); + + assert seg.writeLock().isHeldByCurrentThread(); + + if (!seg.acquiredPages.containsKey(fullPageId)) + seg.loadedPages.remove(fullPageId); + else + continue; + + // Force flush data and free page. + if (isDirty(absEvictAddr)) { + storeMgr.write(fullPageId.cacheId(), fullPageId.pageId(), wrapPointer(absEvictAddr + PAGE_OVERHEAD, pageSize())); + + setDirty(fullPageId, absEvictAddr, false); + } + + return relEvictAddr; + } + } + + /** + * Find oldest and preferable not dirty page from passed ones. + * + * @param relAddrs Addresses to find from. + * @return The oldest and may be dirty page relative address. + */ + private long findSuitablePageRelAddr(final long[] relAddrs) { + long addr = INVALID_REL_PTR; + long ts = Long.MAX_VALUE; + long dirtyTs = Long.MAX_VALUE; + long dirtyAddr = INVALID_REL_PTR; + + for (final long relAddr : relAddrs) { + final long absPageAddr = absolute(relAddr); + + final long pageTs = readTimestamp(absPageAddr); + + final boolean dirty = isDirty(absPageAddr); + + if (pageTs < ts && !dirty) { + addr = relAddr; + + ts = pageTs; + } else if (pageTs < dirtyTs && dirty) { + dirtyAddr = relAddr; + + dirtyTs = pageTs; + } + } + + return addr == INVALID_REL_PTR ? dirtyAddr : addr; + } + + /** * @param relPtr Relative pointer to free. */ private void releaseFreePage(long relPtr) { @@ -877,7 +1056,10 @@ public class PageMemoryImpl implements PageMemory { * @param len Length of the allocated memory. */ private Segment(long ptr, long len, boolean clear) { - loadedPages = new FullPageIdTable(mem, ptr, len, clear); + loadedPages = new FullPageIdTable(mem, ptr, len, clear, + storeMgr == null // if null evictions won't be used + ? FullPageIdTable.AddressingStrategy.QUADRATIC + : FullPageIdTable.AddressingStrategy.LINEAR); acquiredPages = new HashMap<>(16, 0.75f); }
