ignite-3477 Reuse list was not used for on stripe add, minor optimization for empty stripes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db548dc2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db548dc2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db548dc2 Branch: refs/heads/ignite-4652 Commit: db548dc27d3d6d2efffb678be8cb885283f27789 Parents: 0314dec Author: sboikov <[email protected]> Authored: Wed Feb 15 10:39:52 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Feb 15 10:39:52 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 2 + .../apache/ignite/internal/pagemem/Page.java | 2 +- .../ignite/internal/pagemem/PageIdUtils.java | 2 +- .../IgniteCacheDatabaseSharedManager.java | 9 + .../cache/database/freelist/FreeList.java | 6 + .../cache/database/freelist/FreeListImpl.java | 43 ++++ .../cache/database/freelist/PagesList.java | 258 ++++++++++++------- .../cluster/IgniteChangeGlobalStateSupport.java | 2 + 8 files changed, 229 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index a6a159c..c71ab2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1157,6 +1157,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]"; log.info(msg); + + ctx.cache().context().database().dumpStatistics(log); } catch (IgniteClientDisconnectedException ignore) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java index 2667e44..a93d186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java @@ -36,7 +36,7 @@ public interface Page extends AutoCloseable { public FullPageId fullId(); /** - * @return Pointer for modifying the page. + * @return Pointer for reading the page. */ public long getForReadPointer(); http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java index 718be39..92f427a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdUtils.java @@ -108,7 +108,7 @@ public final class PageIdUtils { * @return Page ID. */ public static long pageId(long link) { - return flag(link) == PageIdAllocator.FLAG_IDX? link : link & PAGE_ID_MASK; + return flag(link) == PageIdAllocator.FLAG_IDX ? link : link & PAGE_ID_MASK; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 9c10057..6370798 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -21,6 +21,7 @@ import java.io.File; import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.internal.GridKernalContext; @@ -76,6 +77,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** + * @param log Logger. + */ + public void dumpStatistics(IgniteLogger log) { + if (freeList != null) + freeList.dumpStatistics(log); + } + + /** * @throws IgniteCheckedException If failed. */ protected void initDataStructures() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java index d72c5b9..3266b95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.database.freelist; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; /** @@ -42,4 +43,9 @@ public interface FreeList { * @throws IgniteCheckedException If failed. */ public void removeDataRowByLink(long link) throws IgniteCheckedException; + + /** + * @param log Logger. + */ + public void dumpStatistics(IgniteLogger log); } http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java index 94fcc17..d6debd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.database.freelist; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -314,6 +315,48 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { init(metaPageId, initNew); } + /** {@inheritDoc} */ + @Override public void dumpStatistics(IgniteLogger log) { + long dataPages = 0; + + final boolean dumpBucketsInfo = false; + + for (int b = 0; b < BUCKETS; b++) { + long size = bucketsSize[b].longValue(); + + if (!isReuseBucket(b)) + dataPages += size; + + if (dumpBucketsInfo) { + Stripe[] stripes = getBucket(b); + + boolean empty = true; + + if (stripes != null) { + for (Stripe stripe : stripes) { + if (!stripe.empty) { + empty = false; + + break; + } + } + } + + log.info("Bucket [b=" + b + + ", size=" + size + + ", stripes=" + (stripes != null ? stripes.length : 0) + + ", stripesEmpty=" + empty + ']'); + } + } + + if (dataPages > 0) { + log.info("FreeList [name=" + name + + ", buckets=" + BUCKETS + + ", dataPages=" + dataPages + + ", reusePages=" + bucketsSize[REUSE_BUCKET].longValue() + "]"); + } + } + /** * @param freeSpace Page free space. * @param allowReuse {@code True} if it is allowed to get reuse bucket. http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java index 48e0eb3..e5430cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; +import org.jsr166.LongAdder8; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; @@ -74,6 +75,12 @@ public abstract class PagesList extends DataStructure { IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_STRIPES_PER_BUCKET", Math.min(8, Runtime.getRuntime().availableProcessors() * 2)); + /** */ + private final boolean trackBucketsSize = IgniteSystemProperties.getBoolean("IGNITE_PAGES_LIST_TRACK_SIZE", false); + + /** */ + protected final LongAdder8[] bucketsSize; + /** Page ID to store list metadata. */ private final long metaPageId; @@ -134,6 +141,11 @@ public abstract class PagesList extends DataStructure { this.name = name; this.buckets = buckets; this.metaPageId = metaPageId; + + bucketsSize = new LongAdder8[buckets]; + + for (int i = 0; i < buckets; i++) + bucketsSize[i] = new LongAdder8(); } /** @@ -340,6 +352,7 @@ public abstract class PagesList extends DataStructure { * Adds stripe to the given bucket. * * @param bucket Bucket. + * @param reuse {@code True} if possible to use reuse list. * @throws IgniteCheckedException If failed. * @return Tail page ID. */ @@ -375,8 +388,9 @@ public abstract class PagesList extends DataStructure { * @param bucket Bucket index. * @param oldTailId Old tail page ID to replace. * @param newTailId New tail page ID. + * @return {@code True} if stripe was removed. */ - private void updateTail(int bucket, long oldTailId, long newTailId) { + private boolean updateTail(int bucket, long oldTailId, long newTailId) { int idx = -1; for (;;) { @@ -391,6 +405,12 @@ public abstract class PagesList extends DataStructure { assert tails[idx].tailId == oldTailId; if (newTailId == 0L) { + if (tails.length <= MAX_STRIPES_PER_BUCKET / 2) { + tails[idx].empty = true; + + return false; + } + Stripe[] newTails; if (tails.length != 1) @@ -399,13 +419,13 @@ public abstract class PagesList extends DataStructure { newTails = null; // Drop the bucket completely. if (casBucket(bucket, tails, newTails)) - return; + return true; } else { - // It is safe to assign new tail since we do it only when write lock lock on tail is held. + // It is safe to assign new tail since we do it only when write lock on tail is held. tails[idx].tailId = newTailId; - return; + return true; } } } @@ -494,13 +514,14 @@ public abstract class PagesList extends DataStructure { /** * @param bag Reuse bag. - * @param dataPageBuf Data page buffer. + * @param dataPage Data page. + * @param dataPageAddr Data page address. * @param bucket Bucket. * @throws IgniteCheckedException If failed. */ - protected final void put(ReuseBag bag, Page dataPage, long dataPageBuf, int bucket) + protected final void put(ReuseBag bag, Page dataPage, long dataPageAddr, int bucket) throws IgniteCheckedException { - assert bag == null ^ dataPageBuf == 0L; + assert bag == null ^ dataPageAddr == 0L; for (int lockAttempt = 0; ;) { Stripe stripe = getPageForPut(bucket); @@ -508,35 +529,42 @@ public abstract class PagesList extends DataStructure { long tailId = stripe.tailId; try (Page tail = page(tailId)) { - long buf = writeLockPage(tail, bucket, lockAttempt++); // Explicit check. + long pageAddr = writeLockPage(tail, bucket, lockAttempt++); // Explicit check. + + if (pageAddr == 0L) { + if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS) + addStripeForReuseBucket(bucket); - if (buf == 0L) continue; + } - assert PageIO.getPageId(buf) == tailId : "bufPageId = " + PageIO.getPageId(buf) + ", tailId = " + tailId; - assert PageIO.getType(buf) == PageIO.T_PAGE_LIST_NODE; + assert PageIO.getPageId(pageAddr) == tailId : "pageId = " + PageIO.getPageId(pageAddr) + ", tailId = " + tailId; + assert PageIO.getType(pageAddr) == PageIO.T_PAGE_LIST_NODE; boolean ok = false; try { - PagesListNodeIO io = PageIO.getPageIO(buf); + PagesListNodeIO io = PageIO.getPageIO(pageAddr); ok = bag != null ? // Here we can always take pages from the bag to build our list. - putReuseBag(tailId, tail, buf, io, bag, bucket) : + putReuseBag(tailId, tail, pageAddr, io, bag, bucket) : // Here we can use the data page to build list only if it is empty and // it is being put into reuse bucket. Usually this will be true, but there is // a case when there is no reuse bucket in the free list, but then deadlock // on node page allocation from separate reuse list is impossible. // If the data page is not empty it can not be put into reuse bucket and thus // the deadlock is impossible as well. - putDataPage(tailId, tail, buf, io, dataPage, dataPageBuf, bucket); + putDataPage(tailId, tail, pageAddr, io, dataPage, dataPageAddr, bucket); + + if (ok) { + stripe.empty = false; - if (ok) return; + } } finally { - writeUnlock(tail, buf, ok); + writeUnlock(tail, pageAddr, ok); } } } @@ -545,10 +573,10 @@ public abstract class PagesList extends DataStructure { /** * @param pageId Page ID. * @param page Page. - * @param buf Byte buffer. + * @param pageAddr Page address. * @param io IO. * @param dataPage Data page. - * @param dataPageBuf Data page buffer. + * @param dataPageAddr Data page address. * @param bucket Bucket. * @return {@code true} If succeeded. * @throws IgniteCheckedException If failed. @@ -556,27 +584,30 @@ public abstract class PagesList extends DataStructure { private boolean putDataPage( long pageId, Page page, - long buf, + long pageAddr, PagesListNodeIO io, Page dataPage, - long dataPageBuf, + long dataPageAddr, int bucket ) throws IgniteCheckedException { - if (io.getNextId(buf) != 0L) + if (io.getNextId(pageAddr) != 0L) return false; // Splitted. long dataPageId = dataPage.id(); - int idx = io.addPage(buf, dataPageId, pageSize()); + int idx = io.addPage(pageAddr, dataPageId, pageSize()); if (idx == -1) - handlePageFull(pageId, page, buf, io, dataPage, dataPageBuf, bucket); + handlePageFull(pageId, page, pageAddr, io, dataPage, dataPageAddr, bucket); else { + if (trackBucketsSize) + bucketsSize[bucket].increment(); + if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListAddPageRecord(cacheId, pageId, dataPageId)); - DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageBuf); - dataIO.setFreeListPageId(dataPageBuf, pageId); + DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageAddr); + dataIO.setFreeListPageId(dataPageAddr, pageId); if (isWalDeltaRecordNeeded(wal, dataPage)) wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPage.id(), pageId)); @@ -588,34 +619,34 @@ public abstract class PagesList extends DataStructure { /** * @param pageId Page ID. * @param page Page. - * @param buf Buffer. + * @param pageAddr Page address. * @param io IO. * @param dataPage Data page. - * @param dataPageBuf Data page buffer. + * @param dataPageAddr Data page address. * @param bucket Bucket index. * @throws IgniteCheckedException If failed. */ private void handlePageFull( long pageId, Page page, - long buf, + long pageAddr, PagesListNodeIO io, Page dataPage, - long dataPageBuf, + long dataPageAddr, int bucket ) throws IgniteCheckedException { long dataPageId = dataPage.id(); - DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageBuf); + DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageAddr); // Attempt to add page failed: the node page is full. if (isReuseBucket(bucket)) { // If we are on the reuse bucket, we can not allocate new page, because it may cause deadlock. - assert dataIO.isEmpty(dataPageBuf); // We can put only empty data pages to reuse bucket. + assert dataIO.isEmpty(dataPageAddr); // We can put only empty data pages to reuse bucket. // Change page type to index and add it as next node page to this list. dataPageId = PageIdUtils.changeType(dataPageId, FLAG_IDX); - setupNextPage(io, pageId, buf, dataPageId, dataPageBuf); + setupNextPage(io, pageId, pageAddr, dataPageId, dataPageAddr); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListSetNextRecord(cacheId, pageId, dataPageId)); @@ -641,7 +672,7 @@ public abstract class PagesList extends DataStructure { assert nextPageAddr != 0L; try { - setupNextPage(io, pageId, buf, nextId, nextPageAddr); + setupNextPage(io, pageId, pageAddr, nextId, nextPageAddr); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListSetNextRecord(cacheId, pageId, nextId)); @@ -664,7 +695,10 @@ public abstract class PagesList extends DataStructure { assert idx != -1; - dataIO.setFreeListPageId(dataPageBuf, nextId); + if (trackBucketsSize) + bucketsSize[bucket].increment(); + + dataIO.setFreeListPageId(dataPageAddr, nextId); if (isWalDeltaRecordNeeded(wal, dataPage)) wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPageId, nextId)); @@ -681,7 +715,7 @@ public abstract class PagesList extends DataStructure { /** * @param pageId Page ID. * @param page Page. - * @param buf Buffer. + * @param pageAddr Page address. * @param io IO. * @param bag Reuse bag. * @param bucket Bucket. @@ -692,24 +726,24 @@ public abstract class PagesList extends DataStructure { private boolean putReuseBag( final long pageId, Page page, - final long buf, + final long pageAddr, PagesListNodeIO io, ReuseBag bag, int bucket ) throws IgniteCheckedException { - if (io.getNextId(buf) != 0L) + if (io.getNextId(pageAddr) != 0L) return false; // Splitted. long nextId; - long prevBuf = buf; + long prevPageAddr = pageAddr; long prevId = pageId; List<Page> locked = null; // TODO may be unlock right away and do not keep all these pages locked? - List<Long> lockedBufs = null; + List<Long> lockedAddrs = null; try { while ((nextId = bag.pollFreePage()) != 0L) { - int idx = io.addPage(prevBuf, nextId, pageSize()); + int idx = io.addPage(prevPageAddr, nextId, pageSize()); if (idx == -1) { // Attempt to add page failed: the node page is full. try (Page next = page(nextId)) { @@ -718,14 +752,14 @@ public abstract class PagesList extends DataStructure { assert nextPageAddr != 0L; if (locked == null) { - lockedBufs = new ArrayList<>(2); + lockedAddrs = new ArrayList<>(2); locked = new ArrayList<>(2); } locked.add(next); - lockedBufs.add(nextPageAddr); + lockedAddrs.add(nextPageAddr); - setupNextPage(io, prevId, prevBuf, nextId, nextPageAddr); + setupNextPage(io, prevId, prevPageAddr, nextId, nextPageAddr); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListSetNextRecord(cacheId, prevId, nextId)); @@ -746,12 +780,15 @@ public abstract class PagesList extends DataStructure { // Switch to this new page, which is now a part of our list // to add the rest of the bag to the new page. - prevBuf = nextPageAddr; + prevPageAddr = nextPageAddr; prevId = nextId; page = next; } } else { + if (trackBucketsSize) + bucketsSize[bucket].increment(); + // TODO: use single WAL record for bag? if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListAddPageRecord(cacheId, prevId, nextId)); @@ -765,7 +802,7 @@ public abstract class PagesList extends DataStructure { // Release write. for (int i = 0; i < locked.size(); i++) - writeUnlock(locked.get(i), lockedBufs.get(i), true); + writeUnlock(locked.get(i), lockedAddrs.get(i), true); } } @@ -803,7 +840,8 @@ public abstract class PagesList extends DataStructure { Stripe[] stripes = getBucket(bucket); if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET) { - addStripe(bucket, false); + if (!isReuseBucket(bucket)) + addStripe(bucket, true); return 0L; } @@ -813,6 +851,19 @@ public abstract class PagesList extends DataStructure { } /** + * @param bucket Bucket. + * @throws IgniteCheckedException If failed. + */ + private void addStripeForReuseBucket(int bucket) throws IgniteCheckedException { + assert isReuseBucket(bucket); + + Stripe[] stripes = getBucket(bucket); + + if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET) + addStripe(bucket, false); + } + + /** * @param bucket Bucket index. * @param initIoVers Optional IO to initialize page. * @return Removed page ID. @@ -822,33 +873,40 @@ public abstract class PagesList extends DataStructure { for (int lockAttempt = 0; ;) { Stripe stripe = getPageForTake(bucket); - if (stripe == null) + if (stripe == null || stripe.empty) return 0L; long tailId = stripe.tailId; try (Page tail = page(tailId)) { - long tailBuf = writeLockPage(tail, bucket, lockAttempt++); // Explicit check. + long tailPageAddr = writeLockPage(tail, bucket, lockAttempt++); // Explicit check. + + if (tailPageAddr == 0L) { + if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS) + addStripeForReuseBucket(bucket); - if (tailBuf == 0L) continue; + } - assert PageIO.getPageId(tailBuf) == tailId : "tailId = " + tailId + ", tailBufId = " + PageIO.getPageId(tailBuf); - assert PageIO.getType(tailBuf) == PageIO.T_PAGE_LIST_NODE; + assert PageIO.getPageId(tailPageAddr) == tailId : "tailId = " + tailId + ", tailPageId = " + PageIO.getPageId(tailPageAddr); + assert PageIO.getType(tailPageAddr) == PageIO.T_PAGE_LIST_NODE; boolean dirty = false; long ret = 0L; long recycleId = 0L; try { - PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(tailBuf); + PagesListNodeIO io = PagesListNodeIO.VERSIONS.forPage(tailPageAddr); - if (io.getNextId(tailBuf) != 0) + if (io.getNextId(tailPageAddr) != 0) continue; - long pageId = io.takeAnyPage(tailBuf); + long pageId = io.takeAnyPage(tailPageAddr); if (pageId != 0L) { + if (trackBucketsSize) + bucketsSize[bucket].decrement(); + if (isWalDeltaRecordNeeded(wal, tail)) wal.log(new PagesListRemovePageRecord(cacheId, tailId, pageId)); @@ -858,8 +916,8 @@ public abstract class PagesList extends DataStructure { // If we got an empty page in non-reuse bucket, move it back to reuse list // to prevent empty page leak to data pages. - if (io.isEmpty(tailBuf) && !isReuseBucket(bucket)) { - long prevId = io.getPreviousId(tailBuf); + if (io.isEmpty(tailPageAddr) && !isReuseBucket(bucket)) { + long prevId = io.getPreviousId(tailPageAddr); if (prevId != 0L) { try (Page prev = page(prevId)) { @@ -869,13 +927,13 @@ public abstract class PagesList extends DataStructure { assert ok == TRUE : ok; } - recycleId = recyclePage(tailId, tail, tailBuf); + recycleId = recyclePage(tailId, tail, tailPageAddr); } } } else { // The tail page is empty. We can unlink and return it if we have a previous page. - long prevId = io.getPreviousId(tailBuf); + long prevId = io.getPreviousId(tailPageAddr); if (prevId != 0L) { // This can only happen if we are in the reuse bucket. @@ -893,7 +951,7 @@ public abstract class PagesList extends DataStructure { PageIO initIo = initIoVers.latest(); - initIo.initNewPage(tailBuf, tailId, pageSize()); + initIo.initNewPage(tailPageAddr, tailId, pageSize()); if (isWalDeltaRecordNeeded(wal, tail)) { wal.log(new InitNewPageRecord(cacheId, tail.id(), initIo.getType(), @@ -901,12 +959,14 @@ public abstract class PagesList extends DataStructure { } } else - tailId = recyclePage(tailId, tail, tailBuf); + tailId = recyclePage(tailId, tail, tailPageAddr); dirty = true; ret = tailId; } + else + stripe.empty = true; } // If we do not have a previous page (we are at head), then we still can return @@ -915,7 +975,7 @@ public abstract class PagesList extends DataStructure { // meta page. } finally { - writeUnlock(tail, tailBuf, dirty); + writeUnlock(tail, tailPageAddr, dirty); } // Put recycled page (if any) to the reuse bucket after tail is unlocked. @@ -932,22 +992,21 @@ public abstract class PagesList extends DataStructure { /** * @param dataPage Data page. - * @param dataPageBuf Data page buffer. + * @param dataPageAddr Data page address. * @param dataIO Data page IO. * @param bucket Bucket index. * @throws IgniteCheckedException If failed. * @return {@code True} if page was removed. */ - protected final boolean removeDataPage(Page dataPage, long dataPageBuf, DataPageIO dataIO, int bucket) + protected final boolean removeDataPage(Page dataPage, long dataPageAddr, DataPageIO dataIO, int bucket) throws IgniteCheckedException { long dataPageId = dataPage.id(); - long pageId = dataIO.getFreeListPageId(dataPageBuf); + long pageId = dataIO.getFreeListPageId(dataPageAddr); assert pageId != 0; try (Page page = page(pageId)) { - long prevId; long nextId; long recycleId = 0L; @@ -967,11 +1026,14 @@ public abstract class PagesList extends DataStructure { if (!rmvd) return false; + if (trackBucketsSize) + bucketsSize[bucket].decrement(); + if (isWalDeltaRecordNeeded(wal, page)) wal.log(new PagesListRemovePageRecord(cacheId, pageId, dataPageId)); // Reset free list page ID. - dataIO.setFreeListPageId(dataPageBuf, 0L); + dataIO.setFreeListPageId(dataPageAddr, 0L); if (isWalDeltaRecordNeeded(wal, dataPage)) wal.log(new DataPageSetFreeListPageRecord(cacheId, dataPageId, 0L)); @@ -981,12 +1043,14 @@ public abstract class PagesList extends DataStructure { // If the page is empty, we have to try to drop it and link next and previous with each other. nextId = io.getNextId(pageAddr); - prevId = io.getPreviousId(pageAddr); // If there are no next page, then we can try to merge without releasing current write lock, // because if we will need to lock previous page, the locking order will be already correct. - if (nextId == 0L) + if (nextId == 0L) { + long prevId = io.getPreviousId(pageAddr); + recycleId = mergeNoNext(pageId, page, pageAddr, prevId, bucket); + } } finally { writeUnlock(page, pageAddr, rmvd); @@ -1006,13 +1070,13 @@ public abstract class PagesList extends DataStructure { /** * @param page Page. * @param pageId Page ID. - * @param buf Page byte buffer. + * @param pageAddr Page address. * @param prevId Previous page ID. * @param bucket Bucket index. * @return Page ID to recycle. * @throws IgniteCheckedException If failed. */ - private long mergeNoNext(long pageId, Page page, long buf, long prevId, int bucket) + private long mergeNoNext(long pageId, Page page, long pageAddr, long prevId, int bucket) throws IgniteCheckedException { // If we do not have a next page (we are tail) and we are on reuse bucket, // then we can leave as is as well, because it is normal to have an empty tail page here. @@ -1026,10 +1090,15 @@ public abstract class PagesList extends DataStructure { assert ok == TRUE: ok; // Because we keep lock on current tail and do a world consistency check. } } - else // If we don't have a previous, then we are tail page of free list, just drop the stripe. - updateTail(bucket, pageId, 0L); + else { + // If we don't have a previous, then we are tail page of free list, just drop the stripe. + boolean rmvd = updateTail(bucket, pageId, 0L); + + if (!rmvd) + return 0L; + } - return recyclePage(pageId, page, buf); + return recyclePage(pageId, page, pageAddr); } /** @@ -1091,10 +1160,10 @@ public abstract class PagesList extends DataStructure { * @param page Page. * @param pageId Page ID. * @param io IO. - * @param buf Byte buffer. + * @param pageAddr Page address. * @param next Next page. * @param nextId Next page ID. - * @param nextBuf Next buffer. + * @param nextPageAddr Next page address. * @param bucket Bucket index. * @return Page to recycle. * @throws IgniteCheckedException If failed. @@ -1102,35 +1171,35 @@ public abstract class PagesList extends DataStructure { private long doMerge( long pageId, Page page, - long buf, + long pageAddr, PagesListNodeIO io, Page next, long nextId, - long nextBuf, + long nextPageAddr, int bucket ) throws IgniteCheckedException { - long prevId = io.getPreviousId(buf); + long prevId = io.getPreviousId(pageAddr); if (nextId == 0L) - return mergeNoNext(pageId, page, buf, prevId, bucket); + return mergeNoNext(pageId, page, pageAddr, prevId, bucket); else { // No one must be able to merge it while we keep a reference. - assert getPageId(nextBuf) == nextId; + assert getPageId(nextPageAddr) == nextId; if (prevId == 0L) { // No previous page: we are at head. // These references must be updated at the same time in write locks. - assert PagesListNodeIO.VERSIONS.forPage(nextBuf).getPreviousId(nextBuf) == pageId; + assert PagesListNodeIO.VERSIONS.forPage(nextPageAddr).getPreviousId(nextPageAddr) == pageId; - PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextBuf); - nextIO.setPreviousId(nextBuf, 0); + PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextPageAddr); + nextIO.setPreviousId(nextPageAddr, 0); if (isWalDeltaRecordNeeded(wal, next)) wal.log(new PagesListSetPreviousRecord(cacheId, nextId, 0L)); } else // Do a fair merge: link previous and next to each other. - fairMerge(prevId, pageId, nextId, next, nextBuf); + fairMerge(prevId, pageId, nextId, next, nextPageAddr); - return recyclePage(pageId, page, buf); + return recyclePage(pageId, page, pageAddr); } } @@ -1141,14 +1210,14 @@ public abstract class PagesList extends DataStructure { * @param pageId Page ID. * @param next Next page. * @param nextId Next page ID. - * @param nextBuf Next buffer. + * @param nextPageAddr Next page address. * @throws IgniteCheckedException If failed. */ private void fairMerge(long prevId, long pageId, long nextId, Page next, - long nextBuf) + long nextPageAddr) throws IgniteCheckedException { try (Page prev = page(prevId)) { long prevPageAddr = writeLock(prev); // No check, we keep a reference. @@ -1157,18 +1226,18 @@ public abstract class PagesList extends DataStructure { try { PagesListNodeIO prevIO = PagesListNodeIO.VERSIONS.forPage(prevPageAddr); - PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextBuf); + PagesListNodeIO nextIO = PagesListNodeIO.VERSIONS.forPage(nextPageAddr); // These references must be updated at the same time in write locks. assert prevIO.getNextId(prevPageAddr) == pageId; - assert nextIO.getPreviousId(nextBuf) == pageId; + assert nextIO.getPreviousId(nextPageAddr) == pageId; prevIO.setNextId(prevPageAddr, nextId); if (isWalDeltaRecordNeeded(wal, prev)) wal.log(new PagesListSetNextRecord(cacheId, prevId, nextId)); - nextIO.setPreviousId(nextBuf, prevId); + nextIO.setPreviousId(nextPageAddr, prevId); if (isWalDeltaRecordNeeded(wal, next)) wal.log(new PagesListSetPreviousRecord(cacheId, nextId, prevId)); @@ -1182,14 +1251,14 @@ public abstract class PagesList extends DataStructure { /** * @param page Page. * @param pageId Page ID. - * @param buf Byte buffer. + * @param pageAddr Page address. * @return Rotated page ID. * @throws IgniteCheckedException If failed. */ - private long recyclePage(long pageId, Page page, long buf) throws IgniteCheckedException { + private long recyclePage(long pageId, Page page, long pageAddr) throws IgniteCheckedException { pageId = PageIdUtils.rotatePageId(pageId); - PageIO.setPageId(buf, pageId); + PageIO.setPageId(pageAddr, pageId); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new RecycleRecord(cacheId, page.id(), pageId)); @@ -1238,6 +1307,9 @@ public abstract class PagesList extends DataStructure { /** */ public volatile long tailId; + /** */ + volatile boolean empty; + /** * @param tailId Tail ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/db548dc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java index 5f8aec9..3dd9911 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java @@ -28,6 +28,7 @@ public interface IgniteChangeGlobalStateSupport { * Called when cluster performing activation. * * @param kctx Kernal context. + * @throws IgniteCheckedException If failed. */ public void onActivate(GridKernalContext kctx) throws IgniteCheckedException; @@ -35,6 +36,7 @@ public interface IgniteChangeGlobalStateSupport { * Called when cluster performing deactivation. * * @param kctx Kernal context. + * @throws IgniteCheckedException If failed. */ public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException; }
