http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index 955ca69..a25d794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.cache.database; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; import org.apache.ignite.internal.processors.cache.IncompleteObject; @@ -94,25 +93,26 @@ public class CacheDataRowAdapter implements CacheDataRow { /** * Read row from data pages. * - * @param cctx Cache context. + * @param grp Cache group. * @param rowData Required row data. * @throws IgniteCheckedException If failed. */ - public final void initFromLink(GridCacheContext<?, ?> cctx, RowData rowData) throws IgniteCheckedException { - initFromLink(cctx, cctx.shared(), cctx.memoryPolicy().pageMemory(), rowData); + public final void initFromLink(CacheGroupContext grp, RowData rowData) throws IgniteCheckedException { + initFromLink(grp, grp.shared(), grp.memoryPolicy().pageMemory(), rowData); } /** * Read row from data pages. * Can be called with cctx == null, if cache instance is unknown, but its ID is stored in the data row. * - * @param cctx Cctx. + * @param grp Cache group. * @param sharedCtx Shared context. * @param pageMem Page memory. * @param rowData Row data. + * @throws IgniteCheckedException If failed. */ public final void initFromLink( - @Nullable GridCacheContext<?, ?> cctx, + @Nullable CacheGroupContext grp, GridCacheSharedContext<?, ?> sharedCtx, PageMemory pageMem, RowData rowData) @@ -120,14 +120,9 @@ public class CacheDataRowAdapter implements CacheDataRow { assert link != 0 : "link"; assert key == null : "key"; - CacheObjectContext coctx = null; - - if (cctx != null) { - cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ? - cctx.cacheId() : 0; // Force cacheId reading for evictable memory policies. + CacheObjectContext coctx = grp != null ? grp.cacheObjectContext() : null; - coctx = cctx.cacheObjectContext(); - } + boolean readCacheId = grp == null || grp.storeCacheIdInDataPage(); long nextLink = link; IncompleteObject<?> incomplete = null; @@ -135,10 +130,16 @@ public class CacheDataRowAdapter implements CacheDataRow { do { final long pageId = pageId(nextLink); - final long page = pageMem.acquirePage(cacheId, pageId); + + // Group is null if try evict page, with persistence evictions should be disabled. + assert grp != null || !sharedCtx.database().persistenceEnabled(); + + int grpId = grp != null ? grp.groupId() : 0; + + final long page = pageMem.acquirePage(grpId, pageId); try { - long pageAddr = pageMem.readLock(cacheId, pageId, page); // Non-empty data page must not be recycled. + long pageAddr = pageMem.readLock(grpId, pageId, page); // Non-empty data page must not be recycled. assert pageAddr != 0L : nextLink; @@ -154,7 +155,7 @@ public class CacheDataRowAdapter implements CacheDataRow { if (first) { if (nextLink == 0) { // Fast path for a single page row. - readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData); + readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId); return; } @@ -169,17 +170,17 @@ public class CacheDataRowAdapter implements CacheDataRow { boolean keyOnly = rowData == RowData.KEY_ONLY; - incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, incomplete); + incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete); if (keyOnly && key != null) return; } finally { - pageMem.readUnlock(cacheId, pageId, page); + pageMem.readUnlock(grpId, pageId, page); } } finally { - pageMem.releasePage(cacheId, pageId, page); + pageMem.releasePage(grpId, pageId, page); } } while(nextLink != 0); @@ -188,9 +189,11 @@ public class CacheDataRowAdapter implements CacheDataRow { } /** + * @param sharedCtx Cache shared context. * @param coctx Cache object context. * @param buf Buffer. * @param keyOnly {@code true} If need to read only key object. + * @param readCacheId {@code true} If need to read cache ID. * @param incomplete Incomplete object. * @throws IgniteCheckedException If failed. * @return Read object. @@ -200,9 +203,10 @@ public class CacheDataRowAdapter implements CacheDataRow { CacheObjectContext coctx, ByteBuffer buf, boolean keyOnly, + boolean readCacheId, IncompleteObject<?> incomplete ) throws IgniteCheckedException { - if (cacheId == 0) { + if (readCacheId && cacheId == 0) { incomplete = readIncompleteCacheId(buf, incomplete); if (cacheId == 0) @@ -211,8 +215,13 @@ public class CacheDataRowAdapter implements CacheDataRow { incomplete = null; } - if (coctx == null) + if (coctx == null) { + // coctx can be null only when grp is null too, this means that + // we are in process of eviction and cacheId is mandatory part of data. + assert cacheId != 0; + coctx = sharedCtx.cacheContext(cacheId).cacheObjectContext(); + } // Read key. if (key == null) { @@ -251,20 +260,23 @@ public class CacheDataRowAdapter implements CacheDataRow { } /** + * @param sharedCtx Cache shared context. * @param coctx Cache object context. * @param addr Address. * @param rowData Required row data. + * @param readCacheId {@code true} If need to read cache ID. * @throws IgniteCheckedException If failed. */ private void readFullRow( GridCacheSharedContext<?, ?> sharedCtx, CacheObjectContext coctx, long addr, - RowData rowData) + RowData rowData, + boolean readCacheId) throws IgniteCheckedException { int off = 0; - if (cacheId == 0) { + if (readCacheId) { cacheId = PageUtils.getInt(addr, off); off += 4; @@ -328,6 +340,8 @@ public class CacheDataRowAdapter implements CacheDataRow { if (remaining >= size) { cacheId = buf.getInt(); + assert cacheId != 0; + return null; } @@ -342,6 +356,8 @@ public class CacheDataRowAdapter implements CacheDataRow { timeBuf.order(buf.order()); cacheId = timeBuf.getInt(); + + assert cacheId != 0; } return incomplete; @@ -401,7 +417,6 @@ public class CacheDataRowAdapter implements CacheDataRow { * @param buf Buffer. * @param incomplete Incomplete object. * @return Incomplete object. - * @throws IgniteCheckedException If failed. */ private IncompleteObject<?> readIncompleteExpireTime( ByteBuffer buf,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java index d51cf0e..6e429c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java @@ -37,4 +37,9 @@ public interface CacheSearchRow { * @return Key hash code. */ public int hash(); + + /** + * @return Cache ID or {@code 0} if cache ID is not defined. + */ + public int cacheId(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index fd5e2a2..19c25aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; @@ -730,9 +731,9 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** - * @param stoppedCtxs A collection of tuples (cache context, destroy flag). + * @param stoppedGrps A collection of tuples (cache group, destroy flag). */ - public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) { + public void onCacheGroupsStopped(Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps) { // No-op. } @@ -768,12 +769,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** * Reserve update history for preloading. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param partId Partition Id. * @param cntr Update counter. * @return True if successfully reserved. */ - public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) { + public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java index 5b87cf7..91957db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java @@ -64,49 +64,49 @@ public class IgniteCacheSnapshotManager extends GridCacheSharedManagerAdapter { * */ public void restoreState() throws IgniteCheckedException { - + // No-op. } /** * */ public void onCheckPointBegin() { - + // No-op. } /** * */ public void beforeCheckpointPageWritten() { - + // No-op. } /** * */ public void afterCheckpointPageWritten() { - + // No-op. } /** * @param fullId Full id. */ public void beforePageWrite(FullPageId fullId) { - + // No-op. } /** * @param fullId Full id. */ public void onPageWrite(FullPageId fullId, ByteBuffer tmpWriteBuf) { - + // No-op. } /** * @param cctx Cctx. */ public void onCacheStop(GridCacheContext cctx) { - + // No-op. } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java index 91fed4c..c21b818 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java @@ -29,7 +29,7 @@ public interface MetaStore { * @param idxName Index name. * @return {@link RootPage} that keeps pageId, allocated flag that shows whether the page * was newly allocated, and rootId that is counter which increments each time new page allocated. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ public RootPage getOrAllocateForTree(String idxName) throws IgniteCheckedException; @@ -38,14 +38,14 @@ public interface MetaStore { * * @param idxName Index name. * @return Root ID or -1 if no page was removed. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ public RootPage dropRootPage(String idxName) throws IgniteCheckedException; /** * Destroy this meta store. * - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ public void destroy() throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java index ca4ad05..139bf73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java @@ -53,8 +53,8 @@ public class MetadataStorage implements MetaStore { /** Meta page reuse tree. */ private final ReuseList reuseList; - /** Cache ID. */ - private final int cacheId; + /** Cache group ID. */ + private final int grpId; /** */ private final int allocPartId; @@ -70,7 +70,7 @@ public class MetadataStorage implements MetaStore { final PageMemory pageMem, final IgniteWriteAheadLogManager wal, final AtomicLong globalRmvId, - final int cacheId, + final int grpId, final int allocPartId, final byte allocSpace, final ReuseList reuseList, @@ -79,12 +79,12 @@ public class MetadataStorage implements MetaStore { ) { try { this.pageMem = pageMem; - this.cacheId = cacheId; + this.grpId = grpId; this.allocPartId = allocPartId; this.allocSpace = allocSpace; this.reuseList = reuseList; - metaTree = new MetaTree(cacheId, allocPartId, allocSpace, pageMem, wal, globalRmvId, rootPageId, + metaTree = new MetaTree(grpId, allocPartId, allocSpace, pageMem, wal, globalRmvId, rootPageId, reuseList, MetaStoreInnerIO.VERSIONS, MetaStoreLeafIO.VERSIONS, initNew); } catch (IgniteCheckedException e) { @@ -111,14 +111,14 @@ public class MetadataStorage implements MetaStore { if (reuseList != null) pageId = reuseList.takeRecycledPage(); - pageId = pageId == 0 ? pageMem.allocatePage(cacheId, allocPartId, allocSpace) : pageId; + pageId = pageId == 0 ? pageMem.allocatePage(grpId, allocPartId, allocSpace) : pageId; tree.put(new IndexItem(idxNameBytes, pageId)); - return new RootPage(new FullPageId(pageId, cacheId), true); + return new RootPage(new FullPageId(pageId, grpId), true); } else { - final FullPageId pageId = new FullPageId(row.pageId, cacheId); + final FullPageId pageId = new FullPageId(row.pageId, grpId); return new RootPage(pageId, false); } @@ -134,10 +134,10 @@ public class MetadataStorage implements MetaStore { if (row != null) { if (reuseList == null) - pageMem.freePage(cacheId, row.pageId); + pageMem.freePage(grpId, row.pageId); } - return row != null ? new RootPage(new FullPageId(row.pageId, cacheId), false) : null; + return row != null ? new RootPage(new FullPageId(row.pageId, grpId), false) : null; } /** {@inheritDoc} */ @@ -288,7 +288,7 @@ public class MetadataStorage implements MetaStore { PageUtils.putByte(dstPageAddr, dstOff, len); dstOff++; - PageHandler.copyMemory(srcPageAddr, srcOff, dstPageAddr, dstOff, len); + PageHandler.copyMemory(srcPageAddr, dstPageAddr, srcOff, dstOff, len); srcOff += len; dstOff += len; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java index 563902b..d707869 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java @@ -19,8 +19,9 @@ package org.apache.ignite.internal.processors.cache.database; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; /** @@ -31,27 +32,29 @@ public class RowStore { private final FreeList freeList; /** */ - protected final PageMemory pageMem; + private final GridCacheSharedContext ctx; /** */ - protected final GridCacheContext<?,?> cctx; + protected final PageMemory pageMem; /** */ protected final CacheObjectContext coctx; + + /** - * @param cctx Cache context. + * @param grp Cache group. * @param freeList Free list. */ - public RowStore(GridCacheContext<?,?> cctx, FreeList freeList) { - assert cctx != null; + public RowStore(CacheGroupContext grp, FreeList freeList) { + assert grp != null; assert freeList != null; - this.cctx = cctx; this.freeList = freeList; - coctx = cctx.cacheObjectContext(); - pageMem = cctx.memoryPolicy().pageMemory(); + ctx = grp.shared(); + coctx = grp.cacheObjectContext(); + pageMem = grp.memoryPolicy().pageMemory(); } /** @@ -60,13 +63,13 @@ public class RowStore { */ public void removeRow(long link) throws IgniteCheckedException { assert link != 0; - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { freeList.removeDataRowByLink(link); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } @@ -75,13 +78,13 @@ public class RowStore { * @throws IgniteCheckedException If failed. */ public void addRow(CacheDataRow row) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { freeList.insertDataRow(row); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java index 2465d3f..d92f811 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java @@ -884,11 +884,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param upper Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. * @return Cursor. * @throws IgniteCheckedException If failed. */ - private GridCursor<T> findLowerUnbounded(L upper) throws IgniteCheckedException { - ForwardCursor cursor = new ForwardCursor(null, upper); + private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException { + ForwardCursor cursor = new ForwardCursor(null, upper, x); long firstPageId; @@ -933,14 +934,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Cursor. * @throws IgniteCheckedException If failed. */ - @Override public final GridCursor<T> find(L lower, L upper) throws IgniteCheckedException { + @Override public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException { + return find(lower, upper, null); + } + + /** + * @param lower Lower bound inclusive or {@code null} if unbounded. + * @param upper Upper bound inclusive or {@code null} if unbounded. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * @return Cursor. + * @throws IgniteCheckedException If failed. + */ + public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException { checkDestroyed(); try { if (lower == null) - return findLowerUnbounded(upper); + return findLowerUnbounded(upper, x); - ForwardCursor cursor = new ForwardCursor(lower, upper); + ForwardCursor cursor = new ForwardCursor(lower, upper, x); cursor.find(); @@ -4392,6 +4404,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** */ private final L upperBound; + /** */ + private final Object x; + /** * @param lowerBound Lower bound. * @param upperBound Upper bound. @@ -4399,6 +4414,18 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements ForwardCursor(L lowerBound, L upperBound) { this.lowerBound = lowerBound; this.upperBound = upperBound; + this.x = null; + } + + /** + * @param lowerBound Lower bound. + * @param upperBound Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + */ + ForwardCursor(L lowerBound, L upperBound, Object x) { + this.lowerBound = lowerBound; + this.upperBound = upperBound; + this.x = x; } /** @@ -4515,7 +4542,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements rows = (T[])new Object[cnt]; for (int i = 0; i < cnt; i++) { - T r = getRow(io, pageAddr, startIdx + i); + T r = getRow(io, pageAddr, startIdx + i, x); rows = GridArrays.set(rows, i, r); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java index 2586696..e40ed11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java @@ -157,6 +157,21 @@ public abstract class PageIO { /** */ public static final short T_PAGE_UPDATE_TRACKING = 15; + /** */ + public static final short T_CACHE_ID_AWARE_DATA_REF_INNER = 16; + + /** */ + public static final short T_CACHE_ID_AWARE_DATA_REF_LEAF = 17; + + /** */ + public static final short T_CACHE_ID_AWARE_PENDING_REF_INNER = 18; + + /** */ + public static final short T_CACHE_ID_AWARE_PENDING_REF_LEAF = 19; + + /** */ + public static final short T_PART_CNTRS = 20; + /** Index for payload == 1. */ public static final short T_H2_EX_REF_LEAF_START = 10000; @@ -430,6 +445,9 @@ public abstract class PageIO { case T_PART_META: return (Q)PagePartitionMetaIO.VERSIONS.forVersion(ver); + case T_PART_CNTRS: + return (Q)PagePartitionCountersIO.VERSIONS.forVersion(ver); + case T_PAGE_UPDATE_TRACKING: return (Q)TrackingPageIO.VERSIONS.forVersion(ver); @@ -484,6 +502,12 @@ public abstract class PageIO { case T_DATA_REF_LEAF: return (Q)IgniteCacheOffheapManagerImpl.DataLeafIO.VERSIONS.forVersion(ver); + case T_CACHE_ID_AWARE_DATA_REF_INNER: + return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataInnerIO.VERSIONS.forVersion(ver); + + case T_CACHE_ID_AWARE_DATA_REF_LEAF: + return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataLeafIO.VERSIONS.forVersion(ver); + case T_METASTORE_INNER: return (Q)MetadataStorage.MetaStoreInnerIO.VERSIONS.forVersion(ver); @@ -496,6 +520,12 @@ public abstract class PageIO { case T_PENDING_REF_LEAF: return (Q)IgniteCacheOffheapManagerImpl.PendingEntryLeafIO.VERSIONS.forVersion(ver); + case T_CACHE_ID_AWARE_PENDING_REF_INNER: + return (Q) IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryInnerIO.VERSIONS.forVersion(ver); + + case T_CACHE_ID_AWARE_PENDING_REF_LEAF: + return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryLeafIO.VERSIONS.forVersion(ver); + default: // For tests. if (innerTestIO != null && innerTestIO.getType() == type && innerTestIO.getVersion() == ver) http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java new file mode 100644 index 0000000..015b8ff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.database.tree.io; + +import java.util.Map; +import org.apache.ignite.internal.pagemem.PageUtils; + +/** + * + */ +public class PagePartitionCountersIO extends PageIO { + /** */ + private static final int CNT_OFF = COMMON_HEADER_END; + + /** */ + private static final int LAST_FLAG_OFF = CNT_OFF + 2; + + /** */ + private static final int NEXT_COUNTERS_PAGE_OFF = LAST_FLAG_OFF + 1; + + /** */ + private static final int ITEMS_OFF = NEXT_COUNTERS_PAGE_OFF + 8; + + /** */ + private static final int ITEM_SIZE = 12; + + /** */ + private static final byte LAST_FLAG = 0b1; + + /** */ + public static final IOVersions<PagePartitionCountersIO> VERSIONS = new IOVersions<>( + new PagePartitionCountersIO(1) + ); + + /** + * @param ver Page format version. + */ + public PagePartitionCountersIO(int ver) { + super(T_PART_CNTRS, ver); + } + + /** {@inheritDoc} */ + @Override public void initNewPage(long pageAddr, long pageId, int pageSize) { + super.initNewPage(pageAddr, pageId, pageSize); + + setCount(pageAddr, 0); + setNextCountersPageId(pageAddr, 0); + } + + /** + * @param pageAddr Page address. + * @return Next counters page ID or {@code 0} if it does not exist. + */ + public long getNextCountersPageId(long pageAddr) { + return PageUtils.getLong(pageAddr, NEXT_COUNTERS_PAGE_OFF); + } + + /** + * @param pageAddr Page address. + * @param partMetaPageId Next counters page ID. + */ + public void setNextCountersPageId(long pageAddr, long partMetaPageId) { + PageUtils.putLong(pageAddr, NEXT_COUNTERS_PAGE_OFF, partMetaPageId); + } + + /** + * @param pageSize Page size. + * @param pageAddr Page address. + * @param cacheSizes Serialized cache size items (pairs of cache ID and its size). + * @return Number of written pairs. + */ + public int writeCacheSizes(int pageSize, long pageAddr, byte[] cacheSizes, int itemsOff) { + assert cacheSizes != null; + assert cacheSizes.length % ITEM_SIZE == 0 : cacheSizes.length; + + int cap = getCapacity(pageSize); + + int items = (cacheSizes.length / ITEM_SIZE) - itemsOff; + int write = Math.min(cap, items); + + if (write > 0) + // This can happen in case there are no items in a given partition for all caches in the group. + PageUtils.putBytes(pageAddr, ITEMS_OFF, cacheSizes, itemsOff * ITEM_SIZE, write * ITEM_SIZE); + + setCount(pageAddr, write); + + setLastFlag(pageAddr, write == items); + + return write; + } + + /** + * @param pageAddr Page address. + * @param res Result map of cache sizes. + * @return {@code True} if the map was fully read. + */ + public boolean readCacheSizes(long pageAddr, Map<Integer, Long> res) { + int cnt = getCount(pageAddr); + + assert cnt >= 0 && cnt <= Short.MAX_VALUE : cnt; + + if (cnt == 0) + return true; + + int off = ITEMS_OFF; + + for (int i = 0; i < cnt; i++) { + int cacheId = PageUtils.getInt(pageAddr, off); + off += 4; + + assert cacheId != 0; + + long cacheSize = PageUtils.getLong(pageAddr, off); + off += 8; + + assert cacheSize > 0 : cacheSize; + + Long old = res.put(cacheId, cacheSize); + + assert old == null; + } + + return getLastFlag(pageAddr); + } + + private boolean getLastFlag(long pageAddr) { + return PageUtils.getByte(pageAddr, LAST_FLAG_OFF) == LAST_FLAG; + } + + private void setLastFlag(long pageAddr, boolean last) { + PageUtils.putByte(pageAddr, LAST_FLAG_OFF, last ? LAST_FLAG : ~LAST_FLAG); + } + + /** + * @param pageAddr Page address. + * @return Stored items count. + */ + private int getCount(long pageAddr) { + return PageUtils.getShort(pageAddr, CNT_OFF); + } + + /** + * @param pageAddr Page address. + * @param cnt Stored items count. + */ + private void setCount(long pageAddr, int cnt) { + assert cnt >= 0 && cnt <= Short.MAX_VALUE : cnt; + + PageUtils.putShort(pageAddr, CNT_OFF, (short)cnt); + } + + /** + * @param pageSize Page size. + * @return Maximum number of items which can be stored in buffer. + */ + private int getCapacity(int pageSize) { + return (pageSize - ITEMS_OFF) / ITEM_SIZE; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java index aca0725..67cc5a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java @@ -37,6 +37,9 @@ public class PagePartitionMetaIO extends PageMetaIO { private static final int PARTITION_STATE_OFF = GLOBAL_RMV_ID_OFF + 8; /** */ + private static final int NEXT_PART_META_PAGE_OFF = PARTITION_STATE_OFF + 1; + + /** */ public static final IOVersions<PagePartitionMetaIO> VERSIONS = new IOVersions<>( new PagePartitionMetaIO(1) ); @@ -49,6 +52,7 @@ public class PagePartitionMetaIO extends PageMetaIO { setUpdateCounter(pageAddr, 0); setGlobalRemoveId(pageAddr, 0); setPartitionState(pageAddr, (byte)-1); + setCountersPageId(pageAddr, 0); } /** @@ -120,4 +124,20 @@ public class PagePartitionMetaIO extends PageMetaIO { public void setPartitionState(long pageAddr, byte state) { PageUtils.putByte(pageAddr, PARTITION_STATE_OFF, state); } + + /** + * @param pageAddr Page address. + * @return Next meta partial page ID or {@code 0} if it does not exist. + */ + public long getCountersPageId(long pageAddr) { + return PageUtils.getLong(pageAddr, NEXT_PART_META_PAGE_OFF); + } + + /** + * @param pageAddr Page address. + * @param metaPageId Next partial meta page ID. + */ + public void setCountersPageId(long pageAddr, long metaPageId) { + PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, metaPageId); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index 5d1885e..c092132 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -24,7 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -38,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class GridCacheTtlUpdateRequest extends GridCacheMessage { +public class GridCacheTtlUpdateRequest extends GridCacheIdMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java index 630c79f..fc209aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java @@ -24,7 +24,7 @@ import java.util.Collections; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -37,7 +37,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Base for all messages in replicated cache. */ -public abstract class GridDistributedBaseMessage extends GridCacheMessage implements GridCacheDeployable, +public abstract class GridDistributedBaseMessage extends GridCacheIdMessage implements GridCacheDeployable, GridCacheVersionable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index c966877..dc9e4ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -79,10 +79,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** * @param ctx Cache registry. - * @param startSize Start size. */ - protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, int startSize) { - super(ctx, startSize); + protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx) { + super(ctx); } /** @@ -279,11 +278,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter IgniteCacheOffheapManager offheap = ctx.offheap(); if (modes.offheap) - size += offheap.entriesCount(modes.primary, modes.backup, topVer); + size += offheap.cacheEntriesCount(ctx.cacheId(), modes.primary, modes.backup, topVer); else if (modes.heap) { for (GridDhtLocalPartition locPart : ctx.topology().currentLocalPartitions()) { if ((modes.primary && locPart.primary(topVer)) || (modes.backup && locPart.backup(topVer))) - size += locPart.publicSize(); + size += locPart.publicSize(ctx.cacheId()); } } } @@ -308,7 +307,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer) && modes.primary || ctx.affinity().backupByPartition(ctx.localNode(), part, topVer) && modes.backup) - size += offheap.entriesCount(part); + size += offheap.cacheEntriesCount(ctx.cacheId(), part); } return size; @@ -460,7 +459,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter return false; try { - GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().keysIterator(part); + GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part); if (iter != null) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java index 561c292..c36e633 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java @@ -71,6 +71,16 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public final int partition() { return part; } @@ -135,25 +145,25 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { } switch (writer.state()) { - case 3: + case 2: if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); - case 4: + case 3: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 5: + case 4: if (!writer.writeInt("part", part)) return false; writer.incrementState(); - case 6: + case 5: if (!writer.writeMessage("txId", txId)) return false; @@ -175,7 +185,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { return false; switch (reader.state()) { - case 3: + case 2: flags = reader.readByte("flags"); if (!reader.isLastRead()) @@ -183,7 +193,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { reader.incrementState(); - case 4: + case 3: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -191,7 +201,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { reader.incrementState(); - case 5: + case 4: part = reader.readInt("part"); if (!reader.isLastRead()) @@ -199,7 +209,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { reader.incrementState(); - case 6: + case 5: txId = reader.readMessage("txId"); if (!reader.isLastRead()) @@ -219,7 +229,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 6; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 714d781..5e3020d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -594,6 +594,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter dhtVer, txEntry.updateCounter()); else { + assert val != null : txEntry; + cached.innerSet(this, eventNodeId(), nodeId, http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java index 76c7a15..3b41ffa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java @@ -25,7 +25,7 @@ import java.util.NoSuchElementException; import java.util.Set; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -38,24 +38,25 @@ import org.jetbrains.annotations.Nullable; * An implementation of GridCacheConcurrentMap that will delegate all method calls to corresponding local partition. */ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap { - /** Context. */ - private final GridCacheContext ctx; + /** Cache group. */ + private final CacheGroupContext grp; /** - * Constructor. - * @param ctx Context. + * @param grp Cache group. */ - public GridCachePartitionedConcurrentMap(GridCacheContext ctx) { - this.ctx = ctx; + GridCachePartitionedConcurrentMap(CacheGroupContext grp) { + this.grp = grp; } /** + * @param cctx Cache context. * @param key Key. * @param topVer Topology version. * @param create Create flag. * @return Local partition. */ @Nullable private GridDhtLocalPartition localPartition( + GridCacheContext cctx, KeyCacheObject key, AffinityTopologyVersion topVer, boolean create @@ -63,33 +64,33 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap int p = key.partition(); if (p == -1) - p = ctx.affinity().partition(key); + p = cctx.affinity().partition(key); - return ctx.topology().localPartition(p, topVer, create); + return grp.topology().localPartition(p, topVer, create); } /** {@inheritDoc} */ - @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) { - GridDhtLocalPartition part = localPartition(key, AffinityTopologyVersion.NONE, false); + @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) { + GridDhtLocalPartition part = localPartition(ctx, key, AffinityTopologyVersion.NONE, false); if (part == null) return null; - return part.getEntry(key); + return part.getEntry(ctx, key); } /** {@inheritDoc} */ - @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, + @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key, boolean create, boolean touch) { while (true) { - GridDhtLocalPartition part = localPartition(key, topVer, create); + GridDhtLocalPartition part = localPartition(ctx, key, topVer, create); if (part == null) return null; - GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, create, touch); + GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(ctx, topVer, key, create, touch); if (res != null || !create) return res; @@ -102,35 +103,35 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap @Override public int internalSize() { int size = 0; - for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) size += part.internalSize(); return size; } /** {@inheritDoc} */ - @Override public int publicSize() { + @Override public int publicSize(int cacheId) { int size = 0; - for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) - size += part.publicSize(); + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) + size += part.publicSize(cacheId); return size; } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { - localPartition(e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e); + @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { + localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(hld, e); } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { - localPartition(e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e); + @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { + localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(hld, e); } /** {@inheritDoc} */ @Override public boolean removeEntry(GridCacheEntryEx entry) { - GridDhtLocalPartition part = localPartition(entry.key(), AffinityTopologyVersion.NONE, false); + GridDhtLocalPartition part = localPartition(entry.context(), entry.key(), AffinityTopologyVersion.NONE, false); if (part == null) return false; @@ -139,12 +140,12 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap } /** {@inheritDoc} */ - @Override public Iterable<GridCacheMapEntry> entries(final CacheEntryPredicate... filter) { + @Override public Iterable<GridCacheMapEntry> entries(final int cacheId, final CacheEntryPredicate... filter) { return new Iterable<GridCacheMapEntry>() { @Override public Iterator<GridCacheMapEntry> iterator() { return new PartitionedIterator<GridCacheMapEntry>() { @Override protected Iterator<GridCacheMapEntry> iterator(GridDhtLocalPartition part) { - return part.entries(filter).iterator(); + return part.entries(cacheId, filter).iterator(); } }; } @@ -152,23 +153,10 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap } /** {@inheritDoc} */ - @Override public Iterable<GridCacheMapEntry> allEntries(final CacheEntryPredicate... filter) { - return new Iterable<GridCacheMapEntry>() { - @Override public Iterator<GridCacheMapEntry> iterator() { - return new PartitionedIterator<GridCacheMapEntry>() { - @Override protected Iterator<GridCacheMapEntry> iterator(GridDhtLocalPartition part) { - return part.allEntries(filter).iterator(); - } - }; - } - }; - } - - /** {@inheritDoc} */ - @Override public Set<GridCacheMapEntry> entrySet(final CacheEntryPredicate... filter) { + @Override public Set<GridCacheMapEntry> entrySet(final int cacheId, final CacheEntryPredicate... filter) { return new PartitionedSet<GridCacheMapEntry>() { @Override protected Set<GridCacheMapEntry> set(GridDhtLocalPartition part) { - return part.entrySet(filter); + return part.entrySet(cacheId, filter); } }; } @@ -178,7 +166,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap */ private abstract class PartitionedIterator<T> implements Iterator<T> { /** Partitions iterator. */ - private Iterator<GridDhtLocalPartition> partsIter = ctx.topology().currentLocalPartitions().iterator(); + private Iterator<GridDhtLocalPartition> partsIter = grp.topology().currentLocalPartitions().iterator(); /** Current partition iterator. */ private Iterator<T> currIter = partsIter.hasNext() ? iterator(partsIter.next()) : @@ -242,7 +230,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap @Override public int size() { int size = 0; - for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) size += set(part).size(); return size; @@ -250,7 +238,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap /** {@inheritDoc} */ @Override public boolean contains(Object o) { - for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) { + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) { if (set(part).contains(o)) return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 1482137..cace4e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -73,7 +73,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private GridCacheSharedContext cctx; /** Cache ID. */ - private int cacheId; + private int grpId; /** Logger. */ private final IgniteLogger log; @@ -113,18 +113,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** * @param cctx Context. - * @param cacheId Cache ID. + * @param grpId Group ID. * @param exchFut Exchange ID. * @param similarAffKey Key to find caches with similar affinity. */ public GridClientPartitionTopology( GridCacheSharedContext cctx, - int cacheId, + int grpId, GridDhtPartitionsExchangeFuture exchFut, Object similarAffKey ) { this.cctx = cctx; - this.cacheId = cacheId; + this.grpId = grpId; this.similarAffKey = similarAffKey; topVer = exchFut.topologyVersion(); @@ -168,8 +168,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public int cacheId() { - return cacheId; + @Override public int groupId() { + return grpId; } /** {@inheritDoc} */ @@ -283,7 +283,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheStarted(cacheId)) { + if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheGroupStarted(grpId)) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -361,8 +361,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { - return localPartition(1, AffinityTopologyVersion.NONE, create); + @Override public GridDhtLocalPartition localPartition(int p) { + return localPartition(p, AffinityTopologyVersion.NONE, false); } /** {@inheritDoc} */ @@ -550,7 +550,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { lock.readLock().lock(); try { - assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + + if (stopping || node2part == null) + return null; + + assert node2part.valid() : "Invalid node2part [node2part: " + node2part + ", locNodeId=" + cctx.localNodeId() + ", igniteInstanceName=" + cctx.igniteInstanceName() + ']'; @@ -1035,7 +1038,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() + - ", cacheId=" + cacheId + ']'); + ", grpId=" + grpId + ']'); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java index f80adc5..d9d642a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Affinity assignment request. */ -public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { +public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { /** */ private static final long serialVersionUID = 0L; @@ -46,17 +46,17 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage { /** * @param futId Future ID. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param topVer Topology version. */ public GridDhtAffinityAssignmentRequest( long futId, - int cacheId, + int grpId, AffinityTopologyVersion topVer) { assert topVer != null; this.futId = futId; - this.cacheId = cacheId; + this.grpId = grpId; this.topVer = topVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index 5d82171..4df3fc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -27,20 +27,18 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.NotNull; /** * Affinity assignment response. */ -public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { +public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { /** */ private static final long serialVersionUID = 0L; @@ -73,17 +71,17 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** * @param futId Future ID. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param topVer Topology version. * @param affAssignment Affinity assignment. */ public GridDhtAffinityAssignmentResponse( long futId, - int cacheId, + int grpId, @NotNull AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { this.futId = futId; - this.cacheId = cacheId; + this.grpId = grpId; this.topVer = topVer; affAssignmentIds = ids(affAssignment); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 20d1722..8746320 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -33,7 +33,7 @@ import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -74,27 +74,27 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin private final AffinityTopologyVersion topVer; /** */ - private final int cacheId; + private final int grpId; /** * @param ctx Context. - * @param cacheDesc Cache descriptor. + * @param grpDesc Group descriptor. * @param topVer Topology version. * @param discoCache Discovery cache. */ public GridDhtAssignmentFetchFuture( GridCacheSharedContext ctx, - DynamicCacheDescriptor cacheDesc, + CacheGroupDescriptor grpDesc, AffinityTopologyVersion topVer, DiscoCache discoCache ) { - this.ctx = ctx; - cacheId = cacheDesc.cacheId(); this.topVer = topVer; + this.grpId = grpDesc.groupId(); + this.ctx = ctx; id = idGen.getAndIncrement(); - Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId()); + Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId()); LinkedList<ClusterNode> tmp = new LinkedList<>(); @@ -112,10 +112,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin } /** - * @return Cache ID. + * @return Cache group ID. */ - public int cacheId() { - return cacheId; + public int groupId() { + return grpId; } /** @@ -195,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin ", node=" + node + ']'); ctx.io().send(node, - new GridDhtAffinityAssignmentRequest(id, cacheId, topVer), + new GridDhtAffinityAssignmentRequest(id, grpId, topVer), AFFINITY_POOL); // Close window for listener notification.
