ignite-3484
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dd0afb28 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dd0afb28 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dd0afb28 Branch: refs/heads/ignite-3484 Commit: dd0afb28466094b801506da8afa3601bfaebd853 Parents: e3bba83 Author: sboikov <[email protected]> Authored: Wed Sep 6 10:30:04 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 6 11:11:12 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 10 ++++- .../mvcc/CacheCoordinatorsSharedManager.java | 6 ++- .../cache/tree/AbstractDataInnerIO.java | 44 ++++++++++++++++---- .../cache/tree/AbstractDataLeafIO.java | 19 +++++++-- .../cache/tree/CacheDataRowStore.java | 34 ++++++++++++++- .../processors/cache/tree/CacheDataTree.java | 7 +--- .../processors/cache/tree/MvccDataRow.java | 25 ++++++++++- .../processors/cache/tree/MvccSearchRow.java | 14 +++++-- 8 files changed, 133 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 19a940d..ed52b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1552,9 +1552,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - CacheDataRow row = dataTree.findOne(new MvccSearchRow(cacheId, key, topVer, mvccCntr), + // TODO IGNITE-3484: need special findCeiling method. + + GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, topVer, mvccCntr), + null, CacheDataRowAdapter.RowData.NO_KEY); + CacheDataRow row = null; + + if (cur.next()) + row = cur.get(); + afterRowFound(row, key); return row; http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 2657ea5..807d18a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -79,6 +79,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager cctx.gridIO().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener()); } + /** + * @param txVer Tx version. + * @return Counter. + */ public long requestTxCounterOnCoordinator(GridCacheVersion txVer) { assert cctx.localNode().equals(assignHist.currentCoordinator()); @@ -360,7 +364,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager */ private long assignQueryCounter(UUID qryNodeId) { // TODO IGNITE-3478 - return committedCntr.get() + 1; + return 3; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java index 489fd02..0b4664e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java @@ -75,25 +75,55 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i int hash = getHash(pageAddr, idx); long link = getLink(pageAddr, idx); + if (storeMvccVersion()) { + long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx); + long mvccCntr = getMvccUpdateCounter(pageAddr, idx); + + return ((CacheDataTree)tree).rowStore().mvccKeySearchRow(cacheId, hash, link, mvccTopVer, mvccCntr); + } + return ((CacheDataTree)tree).rowStore().keySearchRow(cacheId, hash, link); } /** {@inheritDoc} */ - @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr, - int srcIdx) { - int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx); - long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx); + @Override public void store(long dstPageAddr, + int dstIdx, + BPlusIO<CacheSearchRow> srcIo, + long srcPageAddr, + int srcIdx) + { + RowLinkIO rowIo = ((RowLinkIO)srcIo); + + int hash = rowIo.getHash(srcPageAddr, srcIdx); + long link =rowIo.getLink(srcPageAddr, srcIdx); int off = offset(dstIdx); PageUtils.putLong(dstPageAddr, off, link); - PageUtils.putInt(dstPageAddr, off + 8, hash); + off += 8; + + PageUtils.putInt(dstPageAddr, off, hash); + off += 4; if (storeCacheId()) { - int cacheId = ((RowLinkIO)srcIo).getCacheId(srcPageAddr, srcIdx); + int cacheId = rowIo.getCacheId(srcPageAddr, srcIdx); assert cacheId != CU.UNDEFINED_CACHE_ID; - PageUtils.putInt(dstPageAddr, off + 12, cacheId); + PageUtils.putInt(dstPageAddr, off, cacheId); + off += 4; + } + + if (storeMvccVersion()) { + long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(srcPageAddr, srcIdx); + long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx); + + assert mvccTopVer > 0 : mvccTopVer; + assert mvcCntr != TxMvccVersion.COUNTER_NA; + + PageUtils.putLong(dstPageAddr, off, mvccTopVer); + off += 8; + + PageUtils.putLong(dstPageAddr, off, mvcCntr); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java index 5b94a15..d496103 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java @@ -108,10 +108,21 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp } /** {@inheritDoc} */ - @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long buf, int idx) { - int cacheId = getCacheId(buf, idx); - int hash = getHash(buf, idx); - long link = getLink(buf, idx); + @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long pageAddr, int idx) { + int cacheId = getCacheId(pageAddr, idx); + int hash = getHash(pageAddr, idx); + long link = getLink(pageAddr, idx); + + if (storeMvccVersion()) { + long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx); + long mvccCntr = getMvccUpdateCounter(pageAddr, idx); + + return ((CacheDataTree)tree).rowStore().mvccKeySearchRow(cacheId, + hash, + link, + mvccTopVer, + mvccCntr); + } return ((CacheDataTree)tree).rowStore().keySearchRow(cacheId, hash, link); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java index 28d8919..6774d3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java @@ -56,13 +56,43 @@ public class CacheDataRowStore extends RowStore { CacheSearchRow keySearchRow(int cacheId, int hash, long link) { DataRow dataRow = new DataRow(grp, hash, link, partId, CacheDataRowAdapter.RowData.KEY_ONLY); - if (dataRow.cacheId() == CU.UNDEFINED_CACHE_ID && grp.sharedGroup()) - dataRow.cacheId(cacheId); + initDataRow(dataRow, cacheId); + + return dataRow; + } + + /** + * @param cacheId Cache ID. + * @param hash Hash code. + * @param link Link. + * @param mvccTopVer + * @param mvccCntr + * @return Search row. + */ + CacheSearchRow mvccKeySearchRow(int cacheId, int hash, long link, long mvccTopVer, long mvccCntr) { + MvccDataRow dataRow = new MvccDataRow(grp, + hash, + link, + partId, + CacheDataRowAdapter.RowData.KEY_ONLY, + mvccTopVer, + mvccCntr); + + initDataRow(dataRow, cacheId); return dataRow; } /** + * @param dataRow Data row. + * @param cacheId Cache ID. + */ + private void initDataRow(DataRow dataRow, int cacheId) { + if (dataRow.cacheId() == CU.UNDEFINED_CACHE_ID && grp.sharedGroup()) + dataRow.cacheId(cacheId); + } + + /** * @param cacheId Cache ID. * @param hash Hash code. * @param link Link. http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index 99b1372..e846768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -114,6 +114,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { /** {@inheritDoc} */ @Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row) throws IgniteCheckedException { + assert !grp.mvccEnabled() || row.mvccUpdateTopologyVersion() != 0 || row.getClass() == SearchRow.class; + RowLinkIO io = (RowLinkIO)iox; int cmp; @@ -185,11 +187,6 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { (CacheDataRowAdapter.RowData)flags : CacheDataRowAdapter.RowData.FULL; -// if (grp.mvccEnabled()) { -// long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(pageAddr, idx); -// long mvcCntr = rowIo.getMvccUpdateCounter(pageAddr, idx); -// } - return rowStore.dataRow(cacheId, hash, link, x); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java index 1ab86ca..484e5f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.processors.cache.tree; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** @@ -26,10 +28,29 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; */ public class MvccDataRow extends DataRow { /** */ - private long mvccCntr; + private long mvccTopVer; /** */ - private long mvccTopVer; + private long mvccCntr; + + /** + * @param grp + * @param hash + * @param link + * @param part + * @param rowData + * @param mvccTopVer + * @param mvccCntr + */ + public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long mvccTopVer, long mvccCntr) { + super(grp, hash, link, part, rowData); + + assert mvccTopVer > 0 : mvccTopVer; + assert mvccCntr != TxMvccVersion.COUNTER_NA; + + this.mvccTopVer = mvccTopVer; + this.mvccCntr = mvccCntr; + } /** * @param key http://git-wip-us.apache.org/repos/asf/ignite/blob/dd0afb28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java index da235ab..ae3da98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java @@ -29,6 +29,12 @@ public class MvccSearchRow extends SearchRow { /** */ private long mvccCntr; + /** + * @param cacheId + * @param key + * @param mvccTopVer + * @param mvccCntr + */ public MvccSearchRow(int cacheId, KeyCacheObject key, long mvccTopVer, long mvccCntr) { super(cacheId, key); @@ -37,12 +43,12 @@ public class MvccSearchRow extends SearchRow { } /** {@inheritDoc} */ - @Override public long mvccUpdateCounter() { - return super.mvccUpdateCounter(); + @Override public long mvccUpdateTopologyVersion() { + return mvccTopVer; } /** {@inheritDoc} */ - @Override public long mvccUpdateTopologyVersion() { - return super.mvccUpdateTopologyVersion(); + @Override public long mvccUpdateCounter() { + return mvccCntr; } }
