Repository: ignite Updated Branches: refs/heads/ignite-10508-pagememory [created] ca09cebcf
IGNITE-10508 page memory updates Signed-off-by: Dmitriy Govorukhin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca09cebc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca09cebc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca09cebc Branch: refs/heads/ignite-10508-pagememory Commit: ca09cebcf7ab7a655dbba7543b150868e2de13ab Parents: ca8a80b Author: Dmitriy Govorukhin <[email protected]> Authored: Wed Dec 19 14:46:19 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Wed Dec 19 14:46:19 2018 +0300 ---------------------------------------------------------------------- .../persistence/pagemem/PageMemoryImpl.java | 199 ++++++++++++++----- 1 file changed, 147 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ca09cebc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 5f16ce8..9de79cd 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -126,9 +126,12 @@ public class PageMemoryImpl implements PageMemoryEx { /** */ public static final long PAGE_MARKER = 0x0000000000000001L; - /** Relative pointer chunk index mask. */ + /** Relative pointer chunk segment index mask. */ private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L; + /** Relative pointer chunk checkpoint index mask. */ + private static final long CHECKPOINT_INDEX_MASK = 0xFF00000000000000L; + /** Full relative pointer mask. */ private static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL; @@ -879,15 +882,10 @@ public class PageMemoryImpl implements PageMemoryEx { if (rmv) seg.loadedPages.remove(grpId, PageIdUtils.effectivePageId(pageId)); - Collection<FullPageId> cpPages = seg.segCheckpointPages; - - if (cpPages != null) - cpPages.remove(new FullPageId(pageId, grpId)); - - Collection<FullPageId> dirtyPages = seg.dirtyPages; + FullPageId fullPageId = new FullPageId(pageId, grpId); - if (dirtyPages != null) - dirtyPages.remove(new FullPageId(pageId, grpId)); + seg.removeCheckpointPage(fullPageId); + seg.removeDirtyPage(fullPageId); return relPtr; } @@ -1050,15 +1048,21 @@ public class PageMemoryImpl implements PageMemoryEx { if (segments == null) return new GridMultiCollectionWrapper<>(Collections.<FullPageId>emptyList()); - Collection[] collections = new Collection[segments.length]; + Collection[] collections = new Collection[segments.length * 2]; for (int i = 0; i < segments.length; i++) { Segment seg = segments[i]; - if (seg.segCheckpointPages != null) - throw new IgniteException("Failed to begin checkpoint (it is already in progress)."); + // If curr != null - previous checkpoint was canceled. + if (seg.currChpPages != null) { + collections[(i * 2) + 1] = seg.prevChpPages = seg.currChpPages; - collections[i] = seg.segCheckpointPages = seg.dirtyPages; + seg.checkpointIdx++; + } + else + collections[(i * 2) + 1] = Collections.emptySet(); + + collections[(i * 2)] = seg.currChpPages = seg.dirtyPages; seg.dirtyPages = new GridConcurrentHashSet<>(); } @@ -1083,8 +1087,11 @@ public class PageMemoryImpl implements PageMemoryEx { if (segments == null) return; - for (Segment seg : segments) - seg.segCheckpointPages = null; + for (Segment seg : segments) { + seg.prevChpPages = null; + seg.currChpPages = null; + seg.checkpointIdx = 0; + } if (throttlingPlc != ThrottlingPolicy.DISABLED) writeThrottle.onFinishCheckpoint(); @@ -1212,7 +1219,11 @@ public class PageMemoryImpl implements PageMemoryEx { try { long tmpRelPtr = PageHeader.tempBufferPointer(absPtr); - boolean success = clearCheckpoint(fullId); + boolean pageCopyed = false; + + byte checkpointIdx = (byte)((tmpRelPtr & CHECKPOINT_INDEX_MASK) >> 56); + + boolean success = removeCheckpointPage(fullId); assert success : "Page was pin when we resolve abs pointer, it can not be evicted"; @@ -1221,22 +1232,31 @@ public class PageMemoryImpl implements PageMemoryEx { long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr); - copyInBuffer(tmpAbsPtr, outBuf); + if (checkpointIdx == currentCheckpointIdx(fullId)) { + copyInBuffer(tmpAbsPtr, outBuf); - GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte)0); + GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte)0); - if (tracker != null) - tracker.onCowPageWritten(); + if (tracker != null) + tracker.onCowPageWritten(); - checkpointPool.releaseFreePage(tmpRelPtr); + checkpointPool.releaseFreePage(tmpRelPtr); - // Need release again because we pin page when resolve abs pointer, - // and page did not have tmp buffer page. - if (!pageSingleAcquire) - PageHeader.releasePage(absPtr); + // Need release again because we pin page when resolve abs pointer, + // and page did not have tmp buffer page. + if (!pageSingleAcquire) + PageHeader.releasePage(absPtr); + + pageCopyed = true; + } + else { + GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte)0); + checkpointPool.releaseFreePage(tmpRelPtr); + } } - else { + + if (!pageCopyed) { copyInBuffer(absPtr, outBuf); PageHeader.dirty(absPtr, false); @@ -1470,6 +1490,10 @@ public class PageMemoryImpl implements PageMemoryEx { return locked ? postWriteLockPage(absPtr, fullId) : 0; } + private static long setCheckpointId(long tmpRelPtr, byte chpIdx) { + return (tmpRelPtr & ~CHECKPOINT_INDEX_MASK) & ((((int)chpIdx) << 56) | ~CHECKPOINT_INDEX_MASK); + } + /** * @param absPtr Absolute pointer. * @return Pointer to the page write buffer. @@ -1477,18 +1501,45 @@ public class PageMemoryImpl implements PageMemoryEx { private long postWriteLockPage(long absPtr, FullPageId fullId) { PageHeader.writeTimestamp(absPtr, U.currentTimeMillis()); + // Current checkpoint idx. + byte currChpIdx = currentCheckpointIdx(fullId); + // Create a buffer copy if the page is scheduled for a checkpoint. - if (isInCheckpoint(fullId) && PageHeader.tempBufferPointer(absPtr) == INVALID_REL_PTR) { - long tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(fullId.pageId()); + if (isInCheckpoint(fullId)) { + long tmpBufferPointer = PageHeader.tempBufferPointer(absPtr); + + long tmpRelPtr; + + // If buffer was not created befor. + if (tmpBufferPointer == INVALID_REL_PTR) { + tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(fullId.pageId()); - if (tmpRelPtr == INVALID_REL_PTR) { - rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS); + if (tmpRelPtr == INVALID_REL_PTR) { + rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS); - throw new IgniteException(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG + ": " + memMetrics.getName()); + throw new IgniteException(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG + ": " + memMetrics.getName()); + } + + tmpRelPtr = setCheckpointId(tmpRelPtr, currChpIdx); + + // Pin the page until checkpoint is not finished. + PageHeader.acquirePage(absPtr); } + else { + byte tmpBufferChpIdx = (byte)((tmpBufferPointer & CHECKPOINT_INDEX_MASK) >> 56); - // Pin the page until checkpoint is not finished. - PageHeader.acquirePage(absPtr); + // Check buffer checkpoint id and current checkpoint id. + // If they not equal we can rewrite page content with new page state. + if (currChpIdx != tmpBufferChpIdx) { + tmpRelPtr = setCheckpointId(tmpBufferPointer, currChpIdx); + } + else { + // We have buffer with alredy coppied page for this checkpoint. + assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480 + + return absPtr + PAGE_OVERHEAD; + } + } long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr); @@ -1581,29 +1632,33 @@ public class PageMemoryImpl implements PageMemoryEx { } /** - * @param pageId Page ID to check if it was added to the checkpoint list. + * @param fullPageId Page ID to check if it was added to the checkpoint list. * @return {@code True} if it was added to the checkpoint list. */ - boolean isInCheckpoint(FullPageId pageId) { - Segment seg = segment(pageId.groupId(), pageId.pageId()); - - Collection<FullPageId> pages0 = seg.segCheckpointPages; + boolean isInCheckpoint(FullPageId fullPageId) { + Segment seg = segment(fullPageId.groupId(), fullPageId.pageId()); - return pages0 != null && pages0.contains(pageId); + return seg.isInCheckpoint(fullPageId); } /** - * @param fullPageId Page ID to clear. - * @return {@code True} if remove successfully. + * @param fullPageId + * @return */ - boolean clearCheckpoint(FullPageId fullPageId) { + byte currentCheckpointIdx(FullPageId fullPageId){ Segment seg = segment(fullPageId.groupId(), fullPageId.pageId()); - Collection<FullPageId> pages0 = seg.segCheckpointPages; + return seg.currentCheckpointIdx(fullPageId); + } - assert pages0 != null; + /** + * @param fullPageId Page ID to check if it was added to the checkpoint list. + * @return {@code True} if it was added to the checkpoint list. + */ + boolean removeCheckpointPage(FullPageId fullPageId) { + Segment seg = segment(fullPageId.groupId(), fullPageId.pageId()); - return pages0.remove(fullPageId); + return seg.removeCheckpointPage(fullPageId); } /** @@ -1921,7 +1976,13 @@ public class PageMemoryImpl implements PageMemoryEx { private volatile Collection<FullPageId> dirtyPages = new GridConcurrentHashSet<>(); /** */ - private volatile Collection<FullPageId> segCheckpointPages; + private byte checkpointIdx; + + /** */ + private volatile Collection<FullPageId> prevChpPages; + + /** */ + private volatile Collection<FullPageId> currChpPages; /** */ private final int maxDirtyPages; @@ -1986,6 +2047,44 @@ public class PageMemoryImpl implements PageMemoryEx { /** * */ + private byte currentCheckpointIdx(FullPageId fullPageId) { + return checkpointIdx; + } + + /** + * + */ + private boolean removeCheckpointPage(FullPageId fullPageId) { + Collection<FullPageId> prevChpPages0 = prevChpPages; + Collection<FullPageId> currChpPages0 = currChpPages; + + return (prevChpPages0 != null && prevChpPages0.remove(fullPageId)) | + (currChpPages0 != null && currChpPages0.remove(fullPageId)); + } + + /** + * + */ + private boolean removeDirtyPage(FullPageId fullPageId) { + Collection<FullPageId> dirtyPages0 = dirtyPages; + + return dirtyPages0 != null && dirtyPages0.remove(fullPageId); + } + + /** + * + */ + private boolean isInCheckpoint(FullPageId fullPageId) { + Collection<FullPageId> prevChpPages0 = prevChpPages; + Collection<FullPageId> currChpPages0 = currChpPages; + + return (prevChpPages0 != null && prevChpPages0.contains(fullPageId)) || + (currChpPages0 != null && currChpPages0.contains(fullPageId)); + } + + /** + * + */ private boolean safeToUpdate() { return dirtyPages.size() < maxDirtyPages; } @@ -2070,14 +2169,12 @@ public class PageMemoryImpl implements PageMemoryEx { if (PageHeader.isAcquired(absPtr)) return false; - Collection<FullPageId> cpPages = segCheckpointPages; - clearRowCache(fullPageId, absPtr); if (isDirty(absPtr)) { // Can evict a dirty page only if should be written by a checkpoint. // These pages does not have tmp buffer. - if (cpPages != null && cpPages.contains(fullPageId)) { + if (removeCheckpointPage(fullPageId)) { assert storeMgr != null; memMetrics.updatePageReplaceRate(U.currentTimeMillis() - PageHeader.readTimestamp(absPtr)); @@ -2093,8 +2190,6 @@ public class PageMemoryImpl implements PageMemoryEx { setDirty(fullPageId, absPtr, false, true); - cpPages.remove(fullPageId); - return true; } @@ -2380,7 +2475,7 @@ public class PageMemoryImpl implements PageMemoryEx { ", loaded=" + loadedPages.size() + ", maxDirtyPages=" + maxDirtyPages + ", dirtyPages=" + dirtyPages.size() + - ", cpPages=" + (segCheckpointPages == null ? 0 : segCheckpointPages.size()) + + ", cpPages=" + (currChpPages == null ? 0 : currChpPages.size()) + ", pinnedInSegment=" + pinnedCnt + ", failedToPrepare=" + failToPrepare + ']' + U.nl() + "Out of memory in data region [" +
