Repository: ignite Updated Branches: refs/heads/ignite-3477-gc-pressure [created] 879959610
removed page impl instantiation on putall Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87995961 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87995961 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87995961 Branch: refs/heads/ignite-3477-gc-pressure Commit: 8799596100680efdef7edeb7093cf064106faa4b Parents: 13c3cfc Author: Yakov Zhdanov <[email protected]> Authored: Thu Mar 9 15:57:44 2017 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Thu Mar 9 15:57:44 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/pagemem/PageMemory.java | 57 ++++++++ .../pagemem/impl/PageMemoryNoStoreImpl.java | 82 ++++++++++++ .../cache/CacheOffheapEvictionManager.java | 3 +- .../cache/IgniteCacheOffheapManagerImpl.java | 69 +++++----- .../cache/database/CacheDataRowAdapter.java | 55 ++++---- .../cache/database/freelist/FreeListImpl.java | 36 +++-- .../cache/database/freelist/PagesList.java | 9 +- .../cache/database/tree/BPlusTree.java | 115 ++++++++++++---- .../cache/database/tree/util/PageHandler.java | 134 ++++++++++++++++--- .../atomic/GridNearAtomicFullUpdateRequest.java | 7 +- .../cache/version/GridCacheVersionManager.java | 5 +- 11 files changed, 448 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java index 6333ff9..10d45c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java @@ -36,12 +36,69 @@ public interface PageMemory extends LifecycleAware, PageIdAllocator { public Page page(int cacheId, long pageId) throws IgniteCheckedException; /** + * Gets the page handle associated with the given page ID. Each page obtained with this method must be released by + * calling {@link #releasePage(long)}. This method will allocate page with given ID if it doesn't exist. + * + * @param cacheId Cache ID. + * @param pageId Page ID. + * @return Page handle. + * @throws IgniteCheckedException If failed. + */ + public long pageHandle(int cacheId, long pageId) throws IgniteCheckedException; + + /** + * @param pageHandle Page handle. + * @param pageId Page ID. + * @return Pointer for reading the page. + */ + public long getForReadPointer(long pageHandle, long pageId); + + /** + * Releases reserved page. Released page can be evicted from RAM after flushing modifications to disk. + * + * @param pageHandle Page handle. + */ + public void releaseRead(long pageHandle); + + /** + * @param pageHandle Page handle. + * @param pageId Page ID. + * @return ByteBuffer for modifying the page. + */ + public long getForWritePointer(long pageHandle, long pageId); + + /** + * @param pageHandle Page handle. + * @param pageId Page ID. + * @return ByteBuffer for modifying the page of {@code null} if failed to get write lock. + */ + public long tryGetForWritePointer(long pageHandle, long pageId); + + /** + * Releases reserved page. Released page can be evicted from RAM after flushing modifications to disk. + * @param pageHandle Page handle. + */ + public void releaseWrite(long pageHandle, boolean markDirty); + + /** + * @param pageHandle Page handle. + * @return {@code True} if the page was modified since the last checkpoint. + */ + public boolean isDirty(long pageHandle); + + /** * @param page Page to release. * @throws IgniteCheckedException If failed. */ public void releasePage(Page page) throws IgniteCheckedException; /** + * @param pageHandler Page to release. + * @throws IgniteCheckedException If failed. + */ + public void releasePage(long pageHandler) throws IgniteCheckedException; + + /** * @return Page size in bytes. */ public int pageSize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java index 05fce3d..dd8f441 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.OffheapReadWriteLock; import org.apache.ignite.internal.util.offheap.GridOffHeapOutOfMemoryException; @@ -278,6 +279,72 @@ public class PageMemoryNoStoreImpl implements PageMemory { } /** {@inheritDoc} */ + @Override public long pageHandle( + int cacheId, + long pageId + ) throws IgniteCheckedException { + int pageIdx = PageIdUtils.pageIndex(pageId); + + Segment seg = segment(pageIdx); + + return seg.acquirePageHandle(pageIdx); + } + + /** {@inheritDoc} */ + @Override public long getForReadPointer(long pageHandle, long pageId) { + if (readLockPage(pageHandle, PageIdUtils.tag(pageId))) + return pageHandle + PAGE_OVERHEAD; + + return 0L; + } + + /** {@inheritDoc} */ + @Override public void releaseRead(long pageHandle) { + readUnlockPage(pageHandle); + } + + /** {@inheritDoc} */ + @Override public long getForWritePointer(long pageHandle, long pageId) { + if (writeLockPage(pageHandle, PageIdUtils.tag(pageId))) + return pageHandle + PAGE_OVERHEAD; + + return 0L; + } + + /** {@inheritDoc} */ + @Override public long tryGetForWritePointer(long pageHandle, long pageId) { + int tag = PageIdUtils.tag(pageId); + + if (tryWriteLockPage(pageHandle, tag)) + return pageHandle + PAGE_OVERHEAD; + + return 0L; + } + + @Override public void releaseWrite( + long pageHandle, + boolean markDirty + ) { + long updatedPageId = PageIO.getPageId(pageHandle + PAGE_OVERHEAD); + + writeUnlockPage(pageHandle, PageIdUtils.tag(updatedPageId)); + } + + @Override public boolean isDirty(long pageHandle) { + // TODO - always false for NOSTORE + return false; + } + + @Override public void releasePage(long pageHandler) throws IgniteCheckedException { + // TODO +// if (trackAcquiredPages) { +// Segment seg = segment(PageIdUtils.pageIndex(p.id())); +// +// seg.onPageRelease(); +// } + } + + /** {@inheritDoc} */ @Override public void releasePage(Page p) { if (trackAcquiredPages) { Segment seg = segment(PageIdUtils.pageIndex(p.id())); @@ -532,6 +599,21 @@ public class PageMemoryNoStoreImpl implements PageMemory { } /** + * @param pageIdx Page index. + * @return Page handle. + */ + private long acquirePageHandle(int pageIdx) { + long absPtr = absolute(pageIdx); + + assert absPtr % 8 == 0 : absPtr; + + if (trackAcquiredPages) + acquiredPages.incrementAndGet(); + + return absPtr; + } + + /** */ private void onPageRelease() { acquiredPages.decrementAndGet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java index 6c925ad..e6a9ee7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java @@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -50,7 +51,7 @@ public class CacheOffheapEvictionManager extends GridCacheManagerAdapter impleme return; } - boolean evicted = e.evictInternal(cctx.versions().next(), null); + boolean evicted = e.evictInternal(GridCacheVersionManager.EVICT_VER, null); if (evicted) cctx.cache().removeEntry(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index f47d75e..f87d529 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1462,58 +1462,59 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple PageMemory pageMem = cctx.shared().database().pageMemory(); - try (Page page = page(pageId(link))) { - long pageAddr = page.getForReadPointer(); // Non-empty data page must not be recycled. + long pageId = pageId(link); + long pagePtr = pageMem.pageHandle(cacheId, pageId); - assert pageAddr != 0L : link; + long pageAddr = pageMem.getForReadPointer(pagePtr, pageId); // Non-empty data page must not be recycled. - try { - DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + assert pageAddr != 0L : link; - DataPagePayload data = io.readPayload(pageAddr, - itemId(link), - pageMem.pageSize()); + try { + DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); - if (data.nextLink() == 0) { - long addr = pageAddr + data.offset(); + DataPagePayload data = io.readPayload(pageAddr, + itemId(link), + pageMem.pageSize()); - final int len = PageUtils.getInt(addr, 0); + if (data.nextLink() == 0) { + long addr = pageAddr + data.offset(); - int lenCmp = Integer.compare(len, bytes.length); + final int len = PageUtils.getInt(addr, 0); - if (lenCmp != 0) - return lenCmp; + int lenCmp = Integer.compare(len, bytes.length); - addr += 5; // Skip length and type byte. + if (lenCmp != 0) + return lenCmp; - final int words = len / 8; + addr += 5; // Skip length and type byte. - for (int i = 0; i < words; i++) { - int off = i * 8; + final int words = len / 8; - long b1 = PageUtils.getLong(addr, off); - long b2 = GridUnsafe.getLong(bytes, GridUnsafe.BYTE_ARR_OFF + off); + for (int i = 0; i < words; i++) { + int off = i * 8; - int cmp = Long.compare(b1, b2); + long b1 = PageUtils.getLong(addr, off); + long b2 = GridUnsafe.getLong(bytes, GridUnsafe.BYTE_ARR_OFF + off); - if (cmp != 0) - return cmp; - } + int cmp = Long.compare(b1, b2); - for (int i = words * 8; i < len; i++) { - byte b1 = PageUtils.getByte(addr, i); - byte b2 = bytes[i]; + if (cmp != 0) + return cmp; + } - if (b1 != b2) - return b1 > b2 ? 1 : -1; - } + for (int i = words * 8; i < len; i++) { + byte b1 = PageUtils.getByte(addr, i); + byte b2 = bytes[i]; - return 0; + if (b1 != b2) + return b1 > b2 ? 1 : -1; } + + return 0; } - finally { - page.releaseRead(); - } + } + finally { + pageMem.releaseRead(pagePtr); } // TODO GG-11768. http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index 5a62e75..67a913c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -106,46 +106,47 @@ public class CacheDataRowAdapter implements CacheDataRow { do { PageMemory pageMem = cctx.shared().database().pageMemory(); - try (Page page = page(pageId(nextLink), cctx)) { - long pageAddr = page.getForReadPointer(); // Non-empty data page must not be recycled. + long pageId = pageId(nextLink); + long pagePtr = pageMem.pageHandle(cctx.cacheId(), pageId); - assert pageAddr != 0L : nextLink; + long pageAddr = pageMem.getForReadPointer(pagePtr, pageId); // Non-empty data page must not be recycled. - try { - DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + assert pageAddr != 0L : nextLink; - DataPagePayload data = io.readPayload(pageAddr, - itemId(nextLink), - pageMem.pageSize()); + try { + DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); - nextLink = data.nextLink(); + DataPagePayload data = io.readPayload(pageAddr, + itemId(nextLink), + pageMem.pageSize()); - if (first) { - if (nextLink == 0) { - // Fast path for a single page row. - readFullRow(coctx, pageAddr + data.offset(), rowData); + nextLink = data.nextLink(); - return; - } + if (first) { + if (nextLink == 0) { + // Fast path for a single page row. + readFullRow(coctx, pageAddr + data.offset(), rowData); - first = false; + return; } - ByteBuffer buf = pageMem.pageBuffer(pageAddr); + first = false; + } - buf.position(data.offset()); - buf.limit(data.offset() + data.payloadSize()); + ByteBuffer buf = pageMem.pageBuffer(pageAddr); - boolean keyOnly = rowData == RowData.KEY_ONLY; + buf.position(data.offset()); + buf.limit(data.offset() + data.payloadSize()); - incomplete = readFragment(coctx, buf, keyOnly, incomplete); + boolean keyOnly = rowData == RowData.KEY_ONLY; - if (keyOnly && key != null) - return; - } - finally { - page.releaseRead(); - } + incomplete = readFragment(coctx, buf, keyOnly, incomplete); + + if (keyOnly && key != null) + return; + } + finally { + pageMem.releaseRead(pagePtr); } } while(nextLink != 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java index d6debd8..257a082 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java @@ -80,7 +80,14 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private class UpdateRowHandler extends PageHandler<CacheDataRow, Boolean> { /** {@inheritDoc} */ - @Override public Boolean run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int itemId) + @Override public Boolean run( + Page page, + PageIO iox, + long pageId, + long pageAddr, + CacheDataRow row, + int itemId + ) throws IgniteCheckedException { DataPageIO io = (DataPageIO)iox; @@ -117,7 +124,14 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private class WriteRowHandler extends PageHandler<CacheDataRow, Integer> { /** {@inheritDoc} */ - @Override public Integer run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int written) + @Override public Integer run( + Page page, + PageIO iox, + long pageId, + long pageAddr, + CacheDataRow row, + int written + ) throws IgniteCheckedException { DataPageIO io = (DataPageIO)iox; @@ -229,7 +243,14 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private class RemoveRowHandler extends PageHandler<Void, Long> { /** {@inheritDoc} */ - @Override public Long run(Page page, PageIO iox, long pageAddr, Void arg, int itemId) + @Override public Long run( + Page page, + PageIO iox, + long pageId, + long pageAddr, + Void arg, + int itemId + ) throws IgniteCheckedException { DataPageIO io = (DataPageIO)iox; @@ -435,14 +456,13 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { long pageId = PageIdUtils.pageId(link); int itemId = PageIdUtils.itemId(link); + long pagePtr = pageMem.pageHandle(cacheId, pageId); - try (Page page = pageMem.page(cacheId, pageId)) { - Boolean updated = writePage(pageMem, page, this, updateRow, row, itemId, null); + Boolean updated = writePage(pageMem, pagePtr, pageId, this, updateRow, null, null, row, itemId, null); - assert updated != null; // Can't fail here. + assert updated != null; // Can't fail here. - return updated != null ? updated : false; - } + return updated != null ? updated : false; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java index e5430cf..6cbc199 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java @@ -98,7 +98,14 @@ public abstract class PagesList extends DataStructure { */ private class CutTail extends PageHandler<Void, Boolean> { /** {@inheritDoc} */ - @Override public Boolean run(Page page, PageIO pageIo, long pageAddr, Void ignore, int bucket) + @Override public Boolean run( + Page page, + PageIO pageIo, + long pageId, + long pageAddr, + Void ignore, + int bucket + ) throws IgniteCheckedException { assert getPageId(pageAddr) == page.id(); http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java index 9597f87..835736f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java @@ -593,7 +593,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private class CutRoot extends PageHandler<Void, Bool> { /** {@inheritDoc} */ - @Override public Bool run(Page meta, PageIO iox, long pageAddr, Void ignore, int lvl) + @Override public Bool run( + Page meta, + PageIO iox, + long pageId, + long pageAddr, + Void ignore, + int lvl + ) throws IgniteCheckedException { // Safe cast because we should never recycle meta page until the tree is destroyed. BPlusMetaIO io = (BPlusMetaIO)iox; @@ -623,7 +630,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private class AddRoot extends PageHandler<Long, Bool> { /** {@inheritDoc} */ - @Override public Bool run(Page meta, PageIO iox, long pageAddr, Long rootPageId, int lvl) + @Override public Bool run( + Page meta, + PageIO iox, + long pageId, + long pageAddr, + Long rootPageId, + int lvl + ) throws IgniteCheckedException { assert rootPageId != null; @@ -654,7 +668,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private class InitRoot extends PageHandler<Long, Bool> { /** {@inheritDoc} */ - @Override public Bool run(Page meta, PageIO iox, long pageAddr, Long rootId, int inlineSize) + @Override public Bool run( + Page meta, + PageIO iox, + long pageId, + long pageAddr, + Long rootId, + int inlineSize + ) throws IgniteCheckedException { assert rootId != null; @@ -1049,7 +1070,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } } finally { - if (g.canRelease(page, lvl)) + if (g.canRelease(page, lvl, pageId)) page.close(); } } @@ -1512,7 +1533,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements if (x.isTail(pageId, lvl)) return FOUND; // We've already locked this page, so return that we are ok. - final Page page = page(pageId); + long pageHandle = pageMem.pageHandle(cacheId, pageId); try { for (;;) { @@ -1521,7 +1542,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements x.fwdId(fwdId); x.backId(backId); - Result res = readPage(page, this, search, x, lvl, RETRY); + Result res = readPage(pageHandle, pageId, pageMem, search, x, lvl, RETRY); switch (res) { case GO_DOWN_X: @@ -1540,7 +1561,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements // Intentional fallthrough. case GO_DOWN: - res = x.tryReplaceInner(page, pageId, fwdId, lvl); + res = x.tryReplaceInner(null, pageId, fwdId, lvl); if (res != RETRY) res = invokeDown(x, x.pageId, x.backId, x.fwdId, lvl - 1); @@ -1560,7 +1581,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements assert x.isRemove(); // Guarded by isFinished. - res = x.finishOrLockTail(page, pageId, backId, fwdId, lvl); + res = x.finishOrLockTail(null, pageId, backId, fwdId, lvl); return res; @@ -1568,13 +1589,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements if (lvl == 0) x.invokeClosure(); - return x.onNotFound(page, pageId, fwdId, lvl); + return x.onNotFound(null, pageId, fwdId, lvl); case FOUND: if (lvl == 0) x.invokeClosure(); - return x.onFound(page, pageId, backId, fwdId, lvl); + return x.onFound(null, pageId, backId, fwdId, lvl); default: return res; @@ -1584,8 +1605,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements finally { x.levelExit(); - if (x.canRelease(page, lvl)) - page.close(); + if (x.canRelease(null, lvl, pageId)) + pageMem.releasePage(pageHandle); } } @@ -1728,7 +1749,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements finally { r.page = null; - if (r.canRelease(page, lvl)) + if (r.canRelease(page, lvl, -1)) page.close(); } } @@ -2146,7 +2167,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } } finally { - if (p.canRelease(page, lvl)) + if (p.canRelease(page, lvl, -1)) page.close(); } } @@ -2291,9 +2312,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param page Page. * @param lvl Level. + * @param pageId Page ID. * @return {@code true} If we can release the given page. */ - boolean canRelease(Page page, int lvl) { + boolean canRelease( + Page page, + int lvl, + long pageId + ) { return page != null; } @@ -2473,8 +2499,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** {@inheritDoc} */ - @Override boolean canRelease(Page page, int lvl) { - return page != null && tail != page; + @Override boolean canRelease( + Page page, + int lvl, + long pageId + ) { + return (page != null && tail != page) || + (pageId != -1 && (tail == null || tail.id() != pageId)); } /** @@ -2694,7 +2725,17 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements this.pageId = pageId; this.fwdId = fwdId; - return writePage(pageMem, page, BPlusTree.this, insert, this, lvl, RETRY); + return writePage( + pageMem, + pageMem.pageHandle(cacheId, pageId), + pageId, + BPlusTree.this, + insert, + null, + null, + this, + lvl, + RETRY); } /** @@ -2855,14 +2896,18 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** {@inheritDoc} */ - @Override boolean canRelease(Page page, int lvl) { + @Override boolean canRelease( + Page page, + int lvl, + long pageId + ) { if (page == null) return false; if (op == null) return true; - return op.canRelease(page, lvl); + return op.canRelease(page, lvl, pageId); } /** @@ -3365,7 +3410,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return writePage(pageMem, back, BPlusTree.this, lockBackAndRmvFromLeaf, this, 0, RETRY); } finally { - if (canRelease(back, 0)) + if (canRelease(back, 0, -1)) back.close(); } } @@ -3419,7 +3464,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return writePage(pageMem, back, BPlusTree.this, lockBackAndTail, this, lvl, RETRY); } finally { - if (canRelease(back, lvl)) + if (canRelease(back, lvl, -1)) back.close(); } } @@ -3440,7 +3485,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } finally { // If we were not able to lock forward page as tail, release the page. - if (canRelease(fwd, lvl)) + if (canRelease(fwd, lvl, -1)) fwd.close(); } } @@ -3808,7 +3853,11 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** {@inheritDoc} */ - @Override boolean canRelease(Page page, int lvl) { + @Override boolean canRelease( + Page page, + int lvl, + long pageId + ) { return page != null && !isTail(page.id(), lvl); } @@ -4460,7 +4509,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements private abstract class GetPageHandler<G extends Get> extends PageHandler<G, Result> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public final Result run(Page page, PageIO iox, long pageAddr, G g, int lvl) + @Override public final Result run( + Page page, + PageIO iox, + long pageId, + long pageAddr, + G g, + int lvl + ) throws IgniteCheckedException { assert PageIO.getPageId(pageAddr) == page.id(); @@ -4488,8 +4544,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements throws IgniteCheckedException; /** {@inheritDoc} */ - @Override public final boolean releaseAfterWrite(Page page, G g, int lvl) { - return g.canRelease(page, lvl); + @Override public final boolean releaseAfterWrite( + Page page, + long pageId, + G g, + int lvl + ) { + return g.canRelease(page, lvl, pageId); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler.java index 97b5a04..ff527e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler.java @@ -22,7 +22,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; import org.apache.ignite.internal.util.GridUnsafe; @@ -35,7 +34,14 @@ import static java.lang.Boolean.TRUE; public abstract class PageHandler<X, R> { /** */ private static final PageHandler<Void, Boolean> NOOP = new PageHandler<Void, Boolean>() { - @Override public Boolean run(Page page, PageIO io, long pageAddr, Void arg, int intArg) + @Override public Boolean run( + Page page, + PageIO io, + long pageId, + long pageAddr, + Void arg, + int intArg + ) throws IgniteCheckedException { return TRUE; } @@ -44,22 +50,34 @@ public abstract class PageHandler<X, R> { /** * @param page Page. * @param io IO. + * @param pageId * @param pageAddr Page address. * @param arg Argument. - * @param intArg Argument of type {@code int}. - * @return Result. + * @param intArg Argument of type {@code int}. @return Result. * @throws IgniteCheckedException If failed. */ - public abstract R run(Page page, PageIO io, long pageAddr, X arg, int intArg) + public abstract R run( + Page page, + PageIO io, + long pageId, + long pageAddr, + X arg, + int intArg + ) throws IgniteCheckedException; /** * @param page Page. + * @param pageId Page ID. * @param arg Argument. - * @param intArg Argument of type {@code int}. - * @return {@code true} If release. + * @param intArg Argument of type {@code int}. @return {@code true} If release. */ - public boolean releaseAfterWrite(Page page, X arg, int intArg) { + public boolean releaseAfterWrite( + Page page, + long pageId, + X arg, + int intArg + ) { return true; } @@ -89,13 +107,42 @@ public abstract class PageHandler<X, R> { try { PageIO io = PageIO.getPageIO(pageAddr); - return h.run(page, io, pageAddr, arg, intArg); + return h.run(page, io, + -1, + pageAddr, arg, intArg); } finally { readUnlock(page, pageAddr, lockLsnr); } } + // TODO + public static <X, R> R readPage( + long pageHandle, + long pageId, + PageMemory pageMem, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed + ) throws IgniteCheckedException { + long pageAddr = pageMem.getForReadPointer(pageHandle, pageId); + + if (pageAddr == 0L) + return lockFailed; + + try { + PageIO io = PageIO.getPageIO(pageAddr); + + return h.run(null, io, + pageId, + pageAddr, arg, intArg); + } + finally { + pageMem.releaseRead(pageHandle); + } + } + /** * @param pageMem Page memory. * @param page Page. @@ -227,27 +274,76 @@ public abstract class PageHandler<X, R> { try { if (init != null) // It is a new page and we have to initialize it. - doInitPage(pageMem, page, pageAddr, init, wal); + doInitPage(pageMem, page.id(), pageAddr, init, wal); else init = PageIO.getPageIO(pageAddr); - res = h.run(page, init, pageAddr, arg, intArg); + res = h.run(page, init, + -1, + pageAddr, arg, intArg); ok = true; } finally { assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 - if (h.releaseAfterWrite(page, arg, intArg)) + if (h.releaseAfterWrite(page, + -1, + arg, intArg)) writeUnlock(page, pageAddr, lockLsnr, ok); } return res; } + public static <X, R> R writePage( + PageMemory pageMem, + long pageHandle, + long pageId, + PageLockListener lockLsnr, + PageHandler<X, R> h, + PageIO init, + IgniteWriteAheadLogManager wal, + X arg, + int intArg, + R lockFailed + ) throws IgniteCheckedException { + long pageAddr = pageMem.getForWritePointer(pageHandle, pageId); + + if (pageAddr == 0L) + return lockFailed; + + R res; + + boolean ok = false; + + try { + if (init != null) // It is a new page and we have to initialize it. + doInitPage(pageMem, pageId, pageAddr, init, wal); + else + init = PageIO.getPageIO(pageAddr); + + res = h.run(null, init, + pageId, + pageAddr, arg, intArg); + + ok = true; + } + finally { + assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 + + if (h.releaseAfterWrite(null, + pageId, + arg, intArg)) + pageMem.releaseWrite(pageHandle, ok); + } + + return res; + } + /** * @param pageMem Page memory. - * @param page Page. +// * @param page Page. * @param pageAddr Page address. * @param init Initial IO. * @param wal Write ahead log. @@ -255,23 +351,23 @@ public abstract class PageHandler<X, R> { */ private static void doInitPage( PageMemory pageMem, - Page page, + long pageId, long pageAddr, PageIO init, IgniteWriteAheadLogManager wal ) throws IgniteCheckedException { assert PageIO.getCrc(pageAddr) == 0; //TODO GG-11480 - long pageId = page.id(); +// long pageId = page.id(); init.initNewPage(pageAddr, pageId, pageMem.pageSize()); // Here we should never write full page, because it is known to be new. - page.fullPageWalRecordPolicy(FALSE); + //page.fullPageWalRecordPolicy(FALSE); TODO - if (isWalDeltaRecordNeeded(wal, page)) - wal.log(new InitNewPageRecord(page.fullId().cacheId(), page.id(), - init.getType(), init.getVersion(), pageId)); +// if (isWalDeltaRecordNeeded(wal, page)) +// wal.log(new InitNewPageRecord(page.fullId().cacheId(), page.id(), +// init.getType(), init.getVersion(), pageId)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index 84c2109..5f67f95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -244,11 +243,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat this.clientReq = clientReq; this.addDepInfo = addDepInfo; - // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries - // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys - // participate in request. As such, we know upper bound of all collections in request. If this bound is lower - // than 10, we use it. - initSize = Math.min(maxEntryCnt, 10); + initSize = maxEntryCnt; keys = new ArrayList<>(initSize); http://git-wip-us.apache.org/repos/asf/ignite/blob/87995961/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 9be8b50..fe5f562 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -38,6 +38,9 @@ import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; * caches. */ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { + /** */ + public static final GridCacheVersion EVICT_VER = new GridCacheVersion(Integer.MAX_VALUE, 0, 0, 0); + /** Timestamp used as base time for cache topology version (January 1, 2014). */ public static final long TOP_VER_BASE_TIME = 1388520000000L; @@ -304,4 +307,4 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { public GridCacheVersion last() { return last; } -} \ No newline at end of file +}
