GG-11044 - Implement page evictions from PageMemory for the case when data size is bigger than allocated memory.
Review fixes: - Replace BitSet with HashSet. - Remove array allocation. - Replace atomic write to volatile write for timestamp. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/36b0a20c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/36b0a20c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/36b0a20c Branch: refs/heads/ignite-db-x-10884 Commit: 36b0a20c5b2d9bde2b476ccd7412123b8c2f2255 Parents: 99915a6 Author: dkarachentsev <[email protected]> Authored: Mon Apr 25 10:30:40 2016 +0300 Committer: dkarachentsev <[email protected]> Committed: Mon Apr 25 10:30:40 2016 +0300 ---------------------------------------------------------------------- .../internal/pagemem/DirectMemoryUtils.java | 10 ++ .../ignite/internal/pagemem/impl/PageImpl.java | 8 +- .../internal/pagemem/impl/PageMemoryImpl.java | 109 +++++++++---------- 3 files changed, 65 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/36b0a20c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/DirectMemoryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/DirectMemoryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/DirectMemoryUtils.java index ee9ce08..4c06278 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/DirectMemoryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/DirectMemoryUtils.java @@ -86,4 +86,14 @@ public class DirectMemoryUtils { public boolean compareAndSwapLong(long ptr, long expVal, long newVal) { return GridUnsafe.compareAndSwapLong(null, ptr, expVal, newVal); } + + /** + * Volatile write long to the given memory pointer. + * + * @param ptr Memory pointer. + * @param val Value to write. + */ + public void writeLongVolatile(final long ptr, final long val) { + GridUnsafe.putLongVolatile(null, ptr, val); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/36b0a20c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java index 6e33572..28d4a19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageImpl.java @@ -141,7 +141,7 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { @Override public ByteBuffer getForRead() { acquireShared(1); - pageMem.atomicWriteCurrentTimestamp(ptr); + pageMem.writeCurrentTimestamp(ptr); return reset(buf.asReadOnlyBuffer()); } @@ -158,7 +158,7 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { markDirty(); - pageMem.atomicWriteCurrentTimestamp(ptr); + pageMem.writeCurrentTimestamp(ptr); return reset(buf); } @@ -172,7 +172,7 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { setExclusiveOwnerThread(th); - pageMem.atomicWriteCurrentTimestamp(ptr); + pageMem.writeCurrentTimestamp(ptr); } return reset(buf); @@ -238,7 +238,7 @@ class PageImpl extends AbstractQueuedSynchronizer implements Page { void acquireReference() { refCntUpd.incrementAndGet(this); - pageMem.atomicWriteCurrentTimestamp(ptr); + pageMem.writeCurrentTimestamp(ptr); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/36b0a20c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java index 5ee8bf3..5075f6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryImpl.java @@ -25,8 +25,10 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -231,7 +233,7 @@ public class PageMemoryImpl implements PageMemory { writePageId(absPtr, pageId); writePageCacheId(absPtr, cacheId); - atomicWriteCurrentTimestamp(absPtr); + writeCurrentTimestamp(absPtr); } // TODO pass an argument to decide whether the page should be cleaned. @@ -334,7 +336,7 @@ public class PageMemoryImpl implements PageMemory { long absPtr = absolute(relPtr); writeFullPageId(absPtr, fullId); - atomicWriteCurrentTimestamp(absPtr); + writeCurrentTimestamp(absPtr); // We can clear dirty flag after the page has been allocated. setDirty(fullId, absPtr, false); @@ -645,19 +647,21 @@ public class PageMemoryImpl implements PageMemory { dirtyPages.remove(pageId); } - void atomicWriteCurrentTimestamp(final long absPtr) { - while (true) { - final long readTs = readTimestamp(absPtr); - - if (mem.compareAndSwapLong(absPtr + PAGE_TIMESTAMP_OFFSET, readTs, U.currentTimeMillis())) - break; - } - } - - void writeTimestamp(final long absPtr, final long ts) { - mem.writeLong(absPtr + PAGE_TIMESTAMP_OFFSET, ts); + /** + * Volatile write for current timestamp to page in {@code absAddr} address. + * + * @param absPtr Absolute page address. + */ + void writeCurrentTimestamp(final long absPtr) { + mem.writeLongVolatile(absPtr + PAGE_TIMESTAMP_OFFSET, U.currentTimeMillis()); } + /** + * Read for timestamp from page in {@code absAddr} address. + * + * @param absPtr Absolute page address. + * @return Timestamp. + */ long readTimestamp(final long absPtr) { return mem.readLong(absPtr + PAGE_TIMESTAMP_OFFSET); } @@ -941,8 +945,6 @@ public class PageMemoryImpl implements PageMemory { private long evictPage(final Segment seg) throws IgniteCheckedException { final ThreadLocalRandom rnd = ThreadLocalRandom.current(); - final long[] pageRelAddrs = new long[RANDOM_PAGES_EVICT_NUM]; - final int cap = seg.loadedPages.capacity(); if (seg.acquiredPages.size() >= seg.loadedPages.size()) @@ -950,24 +952,48 @@ public class PageMemoryImpl implements PageMemory { // With big number of random picked pages we may fall into infinite loop, because // every time the same page may be found. - SimpleLongSet ignored = null; + Set<Long> ignored = null; + + long relEvictAddr = INVALID_REL_PTR; while (true) { + long cleanAddr = INVALID_REL_PTR; + long cleanTs = Long.MAX_VALUE; + long dirtyTs = Long.MAX_VALUE; + long dirtyAddr = INVALID_REL_PTR; + for (int i = 0; i < RANDOM_PAGES_EVICT_NUM; i++) { // We need to lookup for pages only in current segment for thread safety, // so peeking random memory will lead to checking for found page segment. // It's much faster to check available pages for segment right away. - final long addr = seg.loadedPages.getNearestAt(rnd.nextInt(cap), INVALID_REL_PTR); + final long rndAddr = seg.loadedPages.getNearestAt(rnd.nextInt(cap), INVALID_REL_PTR); - assert addr != INVALID_REL_PTR; + assert rndAddr != INVALID_REL_PTR; - if (ignored != null && ignored.contains(addr)) + if (relEvictAddr == rndAddr || (ignored != null && ignored.contains(rndAddr))) { i--; - else - pageRelAddrs[i] = addr; - } - final long relEvictAddr = findSuitablePageRelAddr(pageRelAddrs); + continue; + } + + final long absPageAddr = absolute(rndAddr); + + final long pageTs = readTimestamp(absPageAddr); + + final boolean dirty = isDirty(absPageAddr); + + if (pageTs < cleanTs && !dirty) { + cleanAddr = rndAddr; + + cleanTs = pageTs; + } else if (pageTs < dirtyTs && dirty) { + dirtyAddr = rndAddr; + + dirtyTs = pageTs; + } + + relEvictAddr = cleanAddr == INVALID_REL_PTR ? dirtyAddr : cleanAddr; + } assert relEvictAddr != INVALID_REL_PTR; @@ -979,7 +1005,7 @@ public class PageMemoryImpl implements PageMemory { if (fullPageId.pageId() == metaPageId && fullPageId.cacheId() == 0) { if (ignored == null) - ignored = new SimpleLongSet(); + ignored = new HashSet<>(); ignored.add(relEvictAddr); @@ -992,7 +1018,7 @@ public class PageMemoryImpl implements PageMemory { seg.loadedPages.remove(fullPageId); else { if (ignored == null) - ignored = new SimpleLongSet(); + ignored = new HashSet<>(); ignored.add(relEvictAddr); @@ -1011,39 +1037,6 @@ public class PageMemoryImpl implements PageMemory { } /** - * Find oldest and preferable not dirty page from passed ones. - * - * @param relAddrs Addresses to find from. - * @return The oldest and may be dirty page relative address. - */ - private long findSuitablePageRelAddr(final long[] relAddrs) { - long addr = INVALID_REL_PTR; - long ts = Long.MAX_VALUE; - long dirtyTs = Long.MAX_VALUE; - long dirtyAddr = INVALID_REL_PTR; - - for (final long relAddr : relAddrs) { - final long absPageAddr = absolute(relAddr); - - final long pageTs = readTimestamp(absPageAddr); - - final boolean dirty = isDirty(absPageAddr); - - if (pageTs < ts && !dirty) { - addr = relAddr; - - ts = pageTs; - } else if (pageTs < dirtyTs && dirty) { - dirtyAddr = relAddr; - - dirtyTs = pageTs; - } - } - - return addr == INVALID_REL_PTR ? dirtyAddr : addr; - } - - /** * @param relPtr Relative pointer to free. */ private void releaseFreePage(long relPtr) {
