ignite-3478 Mvcc support for sql indexes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6150f3a0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6150f3a0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6150f3a0 Branch: refs/heads/ignite-3478 Commit: 6150f3a0ad310810606ec5bafbd007804808ff25 Parents: 00bd479 Author: sboikov <[email protected]> Authored: Wed Oct 25 15:15:56 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 25 15:15:56 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 181 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 26 +- .../cache/mvcc/CoordinatorAckRequestTx.java | 2 +- .../cache/mvcc/PreviousCoordinatorQueries.java | 4 +- .../cache/persistence/CacheDataRowAdapter.java | 6 +- .../cache/persistence/tree/BPlusTree.java | 42 +- .../cache/persistence/tree/io/IOVersions.java | 7 + .../cache/persistence/tree/io/PageIO.java | 85 +- .../cache/query/GridCacheQueryManager.java | 11 +- .../cache/tree/AbstractDataInnerIO.java | 8 +- .../cache/tree/AbstractDataLeafIO.java | 6 +- .../cache/tree/CacheDataRowStore.java | 6 +- .../processors/cache/tree/CacheDataTree.java | 2 +- .../cache/tree/CacheIdAwareDataInnerIO.java | 2 +- .../cache/tree/CacheIdAwareDataLeafIO.java | 2 +- .../processors/cache/tree/DataInnerIO.java | 2 +- .../processors/cache/tree/DataLeafIO.java | 2 +- .../internal/processors/cache/tree/DataRow.java | 17 +- .../processors/cache/tree/MvccCleanupRow.java | 48 + .../processors/cache/tree/MvccDataRow.java | 25 +- .../processors/cache/tree/MvccUpdateRow.java | 23 +- .../processors/cache/tree/SearchRow.java | 2 +- .../datastreamer/DataStreamerImpl.java | 2 +- .../processors/query/GridQueryIndexing.java | 8 +- .../processors/query/GridQueryProcessor.java | 43 +- ...IgniteClientCacheInitializationFailTest.java | 4 +- .../cache/mvcc/CacheMvccAbstractTest.java | 123 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 78 +- .../processors/database/BPlusTreeSelfTest.java | 106 +- .../query/h2/opt/GridH2SpatialIndex.java | 5 + .../cache/query/GridCacheTwoStepQuery.java | 18 + .../processors/query/h2/IgniteH2Indexing.java | 41 +- .../query/h2/database/H2PkHashIndex.java | 11 +- .../query/h2/database/H2RowFactory.java | 30 +- .../processors/query/h2/database/H2Tree.java | 102 +- .../query/h2/database/H2TreeIndex.java | 74 +- .../h2/database/H2TreeMvccFilterClosure.java | 106 ++ .../h2/database/io/AbstractH2ExtrasInnerIO.java | 190 +++ .../h2/database/io/AbstractH2ExtrasLeafIO.java | 187 +++ .../query/h2/database/io/AbstractH2InnerIO.java | 106 ++ .../query/h2/database/io/AbstractH2LeafIO.java | 108 ++ .../query/h2/database/io/H2ExtrasInnerIO.java | 115 +- .../query/h2/database/io/H2ExtrasLeafIO.java | 111 +- .../query/h2/database/io/H2IOUtils.java | 113 ++ .../query/h2/database/io/H2InnerIO.java | 41 +- .../query/h2/database/io/H2LeafIO.java | 41 +- .../h2/database/io/H2MvccExtrasInnerIO.java | 77 + .../h2/database/io/H2MvccExtrasLeafIO.java | 76 + .../query/h2/database/io/H2MvccInnerIO.java | 42 + .../query/h2/database/io/H2MvccLeafIO.java | 42 + .../query/h2/database/io/H2RowLinkIO.java | 33 + .../query/h2/opt/GridH2IndexBase.java | 27 +- .../query/h2/opt/GridH2KeyValueRowOnheap.java | 30 +- .../query/h2/opt/GridH2MetaTable.java | 5 + .../query/h2/opt/GridH2PlainRowFactory.java | 17 +- .../query/h2/opt/GridH2QueryContext.java | 27 +- .../processors/query/h2/opt/GridH2Row.java | 24 +- .../query/h2/opt/GridH2RowDescriptor.java | 12 +- .../query/h2/opt/GridH2SearchRow.java | 41 + .../query/h2/opt/GridH2SearchRowAdapter.java | 13 +- .../processors/query/h2/opt/GridH2Table.java | 53 +- .../query/h2/twostep/GridMapQueryExecutor.java | 38 +- .../h2/twostep/GridMergeIndexIterator.java | 16 +- .../h2/twostep/GridReduceQueryExecutor.java | 46 +- .../h2/twostep/msg/GridH2QueryRequest.java | 83 +- .../cache/mvcc/CacheMvccSqlQueriesTest.java | 1568 +++++++++++++++++- 66 files changed, 3955 insertions(+), 587 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 1280e75..8ce47bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -39,9 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs; import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList; @@ -57,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.tree.DataRow; +import org.apache.ignite.internal.processors.cache.tree.MvccCleanupRow; import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound; import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound; import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow; @@ -88,6 +87,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_START_CNTR; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue; @@ -1419,12 +1419,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager // TODO IGNITE-3478: null is passed for loaded from store, need handle better. if (mvccVer == null) { - mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L); + mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, MVCC_START_CNTR, 0L); newVal = true; } else - assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion()); + assert val != null || versionForRemovedValue(mvccVer.coordinatorVersion()); if (val != null) { val.valueBytes(coCtx); @@ -1476,8 +1476,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert !old; - if (val != null) + if (val != null) { incrementSize(cctx.cacheId()); + + if (cctx.queries().enabled()) + cctx.queries().store(updateRow, mvccVer, null); + } } finally { busyLock.leaveBusy(); @@ -1531,6 +1535,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) { assert !primary : updateRow; + + cleanup(cctx, updateRow.cleanupRows(), false); + + return null; } else { rowStore.addRow(updateRow); @@ -1543,7 +1551,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager incrementSize(cctx.cacheId()); } - cleanup(updateRow.cleanupRows(), false); + CacheDataRow oldRow = updateRow.oldRow(); + + if (oldRow != null) + oldRow.key(key); + + GridCacheQueryManager qryMgr = cctx.queries(); + + if (qryMgr.enabled()) + qryMgr.store(updateRow, mvccVer, oldRow); + + updatePendingEntries(cctx, updateRow, oldRow); + + cleanup(cctx, updateRow.cleanupRows(), false); return updateRow.activeTransactions(); } @@ -1590,18 +1610,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) { assert !primary : updateRow; - cleanup(updateRow.cleanupRows(), false); + cleanup(cctx, updateRow.cleanupRows(), false); + + return null; } else { if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL) decrementSize(cacheId); - CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true); + long rmvRowLink = cleanup(cctx, updateRow.cleanupRows(), true); - if (rmvRow == null) + if (rmvRowLink == 0) rowStore.addRow(updateRow); else - updateRow.link(rmvRow.link()); + updateRow.link(rmvRowLink); assert updateRow.link() != 0L; @@ -1610,6 +1632,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert !old; } + CacheDataRow oldRow = updateRow.oldRow(); + + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; + + oldRow.key(key); + + GridCacheQueryManager qryMgr = cctx.queries(); + + if (qryMgr.enabled()) + qryMgr.remove(key, oldRow, mvccVer); + + clearPendingEntries(cctx, oldRow); + } + return updateRow.activeTransactions(); } finally { @@ -1623,26 +1660,40 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + boolean cleanup = cctx.queries().enabled() || hasPendingEntries; + GridCursor<CacheDataRow> cur = dataTree.find( new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), new MvccSearchRow(cacheId, key, 1, 1), - CacheDataRowAdapter.RowData.KEY_ONLY); + cleanup ? CacheDataRowAdapter.RowData.NO_KEY : CacheDataRowAdapter.RowData.LINK_ONLY); boolean first = true; while (cur.next()) { CacheDataRow row = cur.get(); + row.key(key); + assert row.link() != 0 : row; boolean rmvd = dataTree.removex(row); - assert rmvd; + assert rmvd : row; + + boolean rmvdVal = versionForRemovedValue(row.mvccCoordinatorVersion()); + + if (cleanup && !rmvdVal) { + if (cctx.queries().enabled()) + cctx.queries().remove(key, row, null); + + if (first) + clearPendingEntries(cctx, row); + } rowStore.removeRow(row.link()); if (first) { - if (!versionForRemovedValue(row.mvccCoordinatorVersion())) + if (!rmvdVal) decrementSize(cctx.cacheId()); first = false; @@ -1651,36 +1702,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** + * @param cctx Cache context. * @param cleanupRows Rows to cleanup. * @param findRmv {@code True} if need keep removed row entry. - * @return Removed row entry if found. + * @return Removed row link of {@code 0} if not found. * @throws IgniteCheckedException If failed. */ - @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv) + private long cleanup(GridCacheContext cctx, @Nullable List<MvccCleanupRow> cleanupRows, boolean findRmv) throws IgniteCheckedException { - CacheSearchRow rmvRow = null; + long rmvRowLink = 0; if (cleanupRows != null) { + GridCacheQueryManager qryMgr = cctx.queries(); + for (int i = 0; i < cleanupRows.size(); i++) { - CacheSearchRow oldRow = cleanupRows.get(i); + MvccCleanupRow cleanupRow = cleanupRows.get(i); + + assert cleanupRow.link() != 0 : cleanupRow; - assert oldRow.link() != 0L : oldRow; + if (qryMgr.enabled() && !versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) { + CacheDataRow oldRow = dataTree.remove(cleanupRow); - boolean rmvd = dataTree.removex(oldRow); + assert oldRow != null : cleanupRow; - assert rmvd; + qryMgr.remove(oldRow.key(), oldRow, null); + } + else { + boolean rmvd = dataTree.removex(cleanupRow); + + assert rmvd; + } if (findRmv && - rmvRow == null && - versionForRemovedValue(oldRow.mvccCoordinatorVersion())) { - rmvRow = oldRow; + rmvRowLink == 0 && + versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) { + rmvRowLink = cleanupRow.link(); } else - rowStore.removeRow(oldRow.link()); + rowStore.removeRow(cleanupRow.link()); } } - return rmvRow; + return rmvRowLink; } /** {@inheritDoc} */ @@ -1753,32 +1816,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager KeyCacheObject key = newRow.key(); - long expireTime = newRow.expireTime(); - GridCacheQueryManager qryMgr = cctx.queries(); - int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - if (qryMgr.enabled()) - qryMgr.store(newRow, oldRow); + qryMgr.store(newRow, null, oldRow); + + updatePendingEntries(cctx, newRow, oldRow); if (oldRow != null) { assert oldRow.link() != 0 : oldRow; - if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); - if (newRow.link() != oldRow.link()) rowStore.removeRow(oldRow.link()); } + updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value()); + } + + /** + * @param cctx Cache context. + * @param newRow + * @param oldRow + * @throws IgniteCheckedException If failed. + */ + private void updatePendingEntries(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow) + throws IgniteCheckedException + { + long expireTime = newRow.expireTime(); + + int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; + + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + } + if (pendingEntries != null && expireTime != 0) { pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link())); hasPendingEntries = true; } - - updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value()); } /** {@inheritDoc} */ @@ -1792,7 +1871,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheQueryManager qryMgr = cctx.queries(); - qryMgr.store(row, null); + qryMgr.store(row, null, null); // TODO IGNITE-3478. } } @@ -1821,14 +1900,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager */ private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { if (oldRow != null) { - int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - - assert oldRow.link() != 0 : oldRow; - assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId : - "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "]."; - - if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + clearPendingEntries(cctx, oldRow); decrementSize(cctx.cacheId()); } @@ -1836,7 +1908,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr.enabled()) - qryMgr.remove(key, oldRow); + qryMgr.remove(key, oldRow, null); if (oldRow != null) rowStore.removeRow(oldRow.link()); @@ -1844,6 +1916,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null); } + /** + * @param cctx + * @param oldRow + * @throws IgniteCheckedException + */ + private void clearPendingEntries(GridCacheContext cctx, CacheDataRow oldRow) + throws IgniteCheckedException { + int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + assert oldRow.link() != 0 : oldRow; + assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId : + "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "]."; + + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + } + /** {@inheritDoc} */ @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { key.valueBytes(cctx.cacheObjectContext()); @@ -1985,7 +2074,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (curKey != null && row.key().equals(curKey)) continue; - if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) { + if (versionForRemovedValue(rowCrdVerMasked)) { curKey = row.key(); continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index fd3c2af..07e30d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -75,10 +75,10 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS */ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** */ - public static final long COUNTER_NA = 0L; + public static final long MVCC_COUNTER_NA = 0L; /** */ - public static final long START_VER = 1L; + public static final long MVCC_START_CNTR = 1L; /** */ private static final boolean STAT_CNTRS = false; @@ -99,7 +99,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { private volatile MvccCoordinator curCrd; /** */ - private final AtomicLong mvccCntr = new AtomicLong(START_VER); + private final AtomicLong mvccCntr = new AtomicLong(MVCC_START_CNTR); /** */ private final GridAtomicLong committedCntr = new GridAtomicLong(1L); @@ -148,6 +148,18 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** + * @param crdVer Mvcc coordinator version. + * @param cntr Counter. + * @return Always {@code true}. + */ + public static boolean assertMvccVersionValid(long crdVer, long cntr) { + assert unmaskCoordinatorVersion(crdVer) > 0; + assert cntr != MVCC_COUNTER_NA; + + return true; + } + + /** * @param crdVer Coordinator version. * @return Coordinator version with removed value flag. */ @@ -651,7 +663,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) { onTxDone(msg.txCounter()); - if (msg.queryCounter() != COUNTER_NA) { + if (msg.queryCounter() != MVCC_COUNTER_NA) { if (msg.queryCoordinatorVersion() == 0) onQueryDone(nodeId, msg.queryCounter()); else @@ -824,7 +836,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { else qryCnt.incrementAndGet(); - res.init(futId, crdVer, mvccCntr, COUNTER_NA); + res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA); return res; } @@ -909,7 +921,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { // } // } // -// res.init(futId, crdVer, mvccCntr, COUNTER_NA); +// res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA); // // return res; } @@ -1197,7 +1209,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param res Response. */ void onResponse(MvccCoordinatorVersionResponse res) { - assert res.counter() != COUNTER_NA; + assert res.counter() != MVCC_COUNTER_NA; if (lsnr != null) lsnr.onMvccResponse(crd.nodeId(), res); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java index c0512f0..5ab3d3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java @@ -60,7 +60,7 @@ public class CoordinatorAckRequestTx implements MvccCoordinatorMessage { /** {@inheritDoc} */ long queryCounter() { - return CacheCoordinatorsProcessor.COUNTER_NA; + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java index 5c56f40..521e989 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java @@ -26,9 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.jetbrains.annotations.Nullable; /** @@ -165,7 +163,7 @@ class PreviousCoordinatorQueries { */ void onQueryDone(UUID nodeId, long crdVer, long cntr) { assert crdVer != 0; - assert cntr != CacheCoordinatorsProcessor.COUNTER_NA; + assert cntr != CacheCoordinatorsProcessor.MVCC_COUNTER_NA; synchronized (this) { MvccCounter mvccCntr = new MvccCounter(crdVer, cntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java index 1e3a229..29bb6bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java @@ -60,6 +60,7 @@ public class CacheDataRowAdapter implements CacheDataRow { protected CacheObject val; /** */ + @GridToStringInclude protected long expireTime = -1; /** */ @@ -599,7 +600,10 @@ public class CacheDataRowAdapter implements CacheDataRow { KEY_ONLY, /** */ - NO_KEY + NO_KEY, + + /** */ + LINK_ONLY, } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index b31a61f..1ebb1e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -884,12 +884,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param upper Upper bound. + * @param c Filter closure. * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. * @return Cursor. * @throws IgniteCheckedException If failed. */ - private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException { - ForwardCursor cursor = new ForwardCursor(null, upper, x); + private GridCursor<T> findLowerUnbounded(L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException { + ForwardCursor cursor = new ForwardCursor(null, upper, c, x); long firstPageId; @@ -946,13 +947,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @throws IgniteCheckedException If failed. */ public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException { + return find(lower, upper, null, x); + } + + /** + * @param lower Lower bound inclusive or {@code null} if unbounded. + * @param upper Upper bound inclusive or {@code null} if unbounded. + * @param c Filter closure. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * @return Cursor. + * @throws IgniteCheckedException If failed. + */ + public final GridCursor<T> find(L lower, L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException { checkDestroyed(); try { if (lower == null) - return findLowerUnbounded(upper, x); + return findLowerUnbounded(upper, c, x); - ForwardCursor cursor = new ForwardCursor(lower, upper, x); + ForwardCursor cursor = new ForwardCursor(lower, upper, c, x); cursor.find(); @@ -4751,14 +4764,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** */ private int row = -1; + /** */ + private final TreeRowClosure<L, T> c; + /** * @param lowerBound Lower bound. * @param upperBound Upper bound. + * @param c Filter closure. * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. */ - ForwardCursor(L lowerBound, L upperBound, Object x) { + ForwardCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> c, Object x) { super(lowerBound, upperBound); + this.c = c; this.x = x; } @@ -4782,15 +4800,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements if (rows == EMPTY) rows = (T[])new Object[cnt]; + int resCnt = 0; + for (int i = 0; i < cnt; i++) { - T r = getRow(io, pageAddr, startIdx + i, x); + int itemIdx = startIdx + i; - rows = GridArrays.set(rows, i, r); + if (c == null || c.apply(BPlusTree.this, io, pageAddr, itemIdx)) { + T r = getRow(io, pageAddr, itemIdx, x); + + rows = GridArrays.set(rows, resCnt++, r); + } } - GridArrays.clearTail(rows, cnt); + GridArrays.clearTail(rows, resCnt); - return true; + return resCnt > 0; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java index d74d344..9dcad9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.persistence.tree.io; +import org.apache.ignite.internal.util.typedef.internal.S; + /** * Registry for IO versions. */ @@ -99,4 +101,9 @@ public final class IOVersions<V extends PageIO> { return res; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IOVersions.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java index 2de0b8c..0a42129 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java @@ -88,6 +88,12 @@ public abstract class PageIO { /** */ private static IOVersions<? extends BPlusLeafIO<?>> h2LeafIOs; + /** */ + private static IOVersions<? extends BPlusInnerIO<?>> h2MvccInnerIOs; + + /** */ + private static IOVersions<? extends BPlusLeafIO<?>> h2MvccLeafIOs; + /** Maximum payload size. */ public static final short MAX_PAYLOAD_SIZE = 2048; @@ -98,6 +104,12 @@ public abstract class PageIO { private static List<IOVersions<? extends BPlusLeafIO<?>>> h2ExtraLeafIOs = new ArrayList<>(MAX_PAYLOAD_SIZE); /** */ + private static List<IOVersions<? extends BPlusInnerIO<?>>> h2ExtraMvccInnerIOs = new ArrayList<>(MAX_PAYLOAD_SIZE); + + /** */ + private static List<IOVersions<? extends BPlusLeafIO<?>>> h2ExtraMvccLeafIOs = new ArrayList<>(MAX_PAYLOAD_SIZE); + + /** */ public static final int TYPE_OFF = 0; /** */ @@ -184,24 +196,42 @@ public abstract class PageIO { public static final short T_PART_CNTRS = 20; /** Index for payload == 1. */ - public static final short T_H2_EX_REF_LEAF_START = 10000; + public static final short T_H2_EX_REF_LEAF_START = 10_000; /** */ public static final short T_H2_EX_REF_LEAF_END = T_H2_EX_REF_LEAF_START + MAX_PAYLOAD_SIZE - 1; /** */ - public static final short T_H2_EX_REF_INNER_START = 20000; + public static final short T_H2_EX_REF_INNER_START = 20_000; /** */ public static final short T_H2_EX_REF_INNER_END = T_H2_EX_REF_INNER_START + MAX_PAYLOAD_SIZE - 1; /** */ + public static final short T_H2_EX_REF_MVCC_LEAF_START = 23_000; + + /** */ + public static final short T_H2_EX_REF_MVCC_LEAF_END = T_H2_EX_REF_MVCC_LEAF_START + MAX_PAYLOAD_SIZE - 1; + + /** */ + public static final short T_H2_EX_REF_MVCC_INNER_START = 26_000; + + /** */ + public static final short T_H2_EX_REF_MVCC_INNER_END = T_H2_EX_REF_MVCC_INNER_START + MAX_PAYLOAD_SIZE - 1; + + /** */ public static final short T_DATA_REF_MVCC_INNER = 21; /** */ public static final short T_DATA_REF_MVCC_LEAF = 22; /** */ + public static final short T_H2_MVCC_REF_LEAF = 23; + + /** */ + public static final short T_H2_MVCC_REF_INNER = 24; + + /** */ private final int ver; /** */ @@ -334,13 +364,19 @@ public abstract class PageIO { * * @param innerIOs Inner IO versions. * @param leafIOs Leaf IO versions. + * @param mvccInnerIOs Inner IO versions with mvcc enabled. + * @param mvccLeafIOs Leaf IO versions with mvcc enabled. */ public static void registerH2( IOVersions<? extends BPlusInnerIO<?>> innerIOs, - IOVersions<? extends BPlusLeafIO<?>> leafIOs + IOVersions<? extends BPlusLeafIO<?>> leafIOs, + IOVersions<? extends BPlusInnerIO<?>> mvccInnerIOs, + IOVersions<? extends BPlusLeafIO<?>> mvccLeafIOs ) { h2InnerIOs = innerIOs; h2LeafIOs = leafIOs; + h2MvccInnerIOs = mvccInnerIOs; + h2MvccLeafIOs = mvccLeafIOs; } /** @@ -348,8 +384,10 @@ public abstract class PageIO { * * @param innerExtIOs Extra versions. */ - public static void registerH2ExtraInner(IOVersions<? extends BPlusInnerIO<?>> innerExtIOs) { - h2ExtraInnerIOs.add(innerExtIOs); + public static void registerH2ExtraInner(IOVersions<? extends BPlusInnerIO<?>> innerExtIOs, boolean mvcc) { + List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? h2ExtraMvccInnerIOs : h2ExtraInnerIOs; + + ios.add(innerExtIOs); } /** @@ -357,24 +395,30 @@ public abstract class PageIO { * * @param leafExtIOs Extra versions. */ - public static void registerH2ExtraLeaf(IOVersions<? extends BPlusLeafIO<?>> leafExtIOs) { - h2ExtraLeafIOs.add(leafExtIOs); + public static void registerH2ExtraLeaf(IOVersions<? extends BPlusLeafIO<?>> leafExtIOs, boolean mvcc) { + List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? h2ExtraMvccLeafIOs : h2ExtraLeafIOs; + + ios.add(leafExtIOs); } /** * @param idx Index. * @return IOVersions for given idx. */ - public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int idx) { - return h2ExtraInnerIOs.get(idx); + public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int idx, boolean mvcc) { + List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? h2ExtraMvccInnerIOs : h2ExtraInnerIOs; + + return ios.get(idx); } /** * @param idx Index. * @return IOVersions for given idx. */ - public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int idx) { - return h2ExtraLeafIOs.get(idx); + public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int idx, boolean mvcc) { + List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? h2ExtraMvccLeafIOs : h2ExtraLeafIOs; + + return ios.get(idx); } /** @@ -493,13 +537,18 @@ public abstract class PageIO { */ @SuppressWarnings("unchecked") public static <Q extends BPlusIO<?>> Q getBPlusIO(int type, int ver) throws IgniteCheckedException { - if (type >= T_H2_EX_REF_LEAF_START && type <= T_H2_EX_REF_LEAF_END) return (Q)h2ExtraLeafIOs.get(type - T_H2_EX_REF_LEAF_START).forVersion(ver); if (type >= T_H2_EX_REF_INNER_START && type <= T_H2_EX_REF_INNER_END) return (Q)h2ExtraInnerIOs.get(type - T_H2_EX_REF_INNER_START).forVersion(ver); + if (type >= T_H2_EX_REF_MVCC_LEAF_START && type <= T_H2_EX_REF_MVCC_LEAF_END) + return (Q)h2ExtraMvccLeafIOs.get(type - T_H2_EX_REF_MVCC_LEAF_START).forVersion(ver); + + if (type >= T_H2_EX_REF_MVCC_INNER_START && type <= T_H2_EX_REF_MVCC_INNER_END) + return (Q)h2ExtraMvccInnerIOs.get(type - T_H2_EX_REF_MVCC_INNER_START).forVersion(ver); + switch (type) { case T_H2_REF_INNER: if (h2InnerIOs == null) @@ -513,6 +562,18 @@ public abstract class PageIO { return (Q)h2LeafIOs.forVersion(ver); + case T_H2_MVCC_REF_INNER: + if (h2MvccInnerIOs == null) + break; + + return (Q)h2MvccInnerIOs.forVersion(ver); + + case T_H2_MVCC_REF_LEAF: + if (h2MvccLeafIOs == null) + break; + + return (Q)h2MvccLeafIOs.forVersion(ver); + case T_DATA_REF_INNER: return (Q)DataInnerIO.VERSIONS.forVersion(ver); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 59b7613..fb5728a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -382,10 +382,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param newRow New row. + * @param mvccVer Mvcc version for update. * @param prevRow Previous row. * @throws IgniteCheckedException In case of error. */ - public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow) + public void store(CacheDataRow newRow, @Nullable MvccCoordinatorVersion mvccVer, @Nullable CacheDataRow prevRow) throws IgniteCheckedException { assert enabled(); assert newRow != null && newRow.value() != null && newRow.link() != 0 : newRow; @@ -405,7 +406,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } if (qryProcEnabled) - qryProc.store(cctx, newRow, prevRow); + qryProc.store(cctx, newRow, mvccVer, prevRow); } finally { invalidateResultCache(); @@ -417,9 +418,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param key Key. * @param prevRow Previous row. + * @param newVer Mvcc version for remove operation. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) throws IgniteCheckedException { + public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow, @Nullable MvccCoordinatorVersion newVer) + throws IgniteCheckedException { if (!QueryUtils.isEnabled(cctx.config())) return; // No-op. @@ -435,7 +438,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // val may be null if we have no previous value. We should not call processor in this case. if (qryProcEnabled && prevRow != null) - qryProc.remove(cctx, prevRow); + qryProc.remove(cctx, prevRow, newVer); } finally { invalidateResultCache(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java index 31aa2ca..c36d5cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteInClosure; -import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; /** @@ -62,7 +62,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i if (storeMvccVersion()) { assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row; - assert row.mvccCounter() != COUNTER_NA : row; + assert row.mvccCounter() != MVCC_COUNTER_NA : row; PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion()); off += 8; @@ -82,7 +82,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i long mvccCntr = getMvccCounter(pageAddr, idx); assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer; - assert mvccCntr != COUNTER_NA; + assert mvccCntr != MVCC_COUNTER_NA; return ((CacheDataTree)tree).rowStore().mvccRow(cacheId, hash, @@ -128,7 +128,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx); assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer; - assert mvccCntr != COUNTER_NA; + assert mvccCntr != MVCC_COUNTER_NA; PageUtils.putLong(dstPageAddr, off, mvccTopVer); off += 8; http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java index 47d8a6f..d60aef2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteInClosure; -import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; /** @@ -64,7 +64,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp long mvccUpdateCntr = row.mvccCounter(); assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer; - assert mvccUpdateCntr != COUNTER_NA; + assert mvccUpdateCntr != MVCC_COUNTER_NA; PageUtils.putLong(pageAddr, off, mvccCrdVer); off += 8; @@ -100,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx); assert unmaskCoordinatorVersion(mvccUpdateTopVer) > 0 : mvccUpdateCntr; - assert mvccUpdateCntr != COUNTER_NA; + assert mvccUpdateCntr != MVCC_COUNTER_NA; PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer); off += 8; http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java index 85624d5..5537794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java @@ -73,9 +73,9 @@ public class CacheDataRowStore extends RowStore { * @return Search row. */ MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) { - if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) { - if (rowData == CacheDataRowAdapter.RowData.NO_KEY) - return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr); + if (versionForRemovedValue(crdVer)) { + if (rowData == CacheDataRowAdapter.RowData.NO_KEY || rowData == CacheDataRowAdapter.RowData.LINK_ONLY) + return MvccDataRow.removedRowNoKey(link, partId, cacheId, crdVer, mvccCntr); else rowData = CacheDataRowAdapter.RowData.KEY_ONLY; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index a699cd3..9f85640 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -169,7 +169,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { long mvccCntr = io.getMvccCounter(pageAddr, idx); - assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA; + assert row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA; cmp = Long.compare(row.mvccCounter(), mvccCntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java index 3d02b27..36ffd49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java @@ -59,6 +59,6 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO { /** {@inheritDoc} */ @Override public long getMvccCounter(long pageAddr, int idx) { - return CacheCoordinatorsProcessor.COUNTER_NA; + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java index 58ae9ff..ae6fc0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java @@ -59,6 +59,6 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO { /** {@inheritDoc} */ @Override public long getMvccCounter(long pageAddr, int idx) { - return CacheCoordinatorsProcessor.COUNTER_NA; + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java index 19a5c47..98a5450 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java @@ -59,6 +59,6 @@ public final class DataInnerIO extends AbstractDataInnerIO { /** {@inheritDoc} */ @Override public long getMvccCounter(long pageAddr, int idx) { - return CacheCoordinatorsProcessor.COUNTER_NA; + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java index ab10b96..b644e6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java @@ -59,6 +59,6 @@ public final class DataLeafIO extends AbstractDataLeafIO { /** {@inheritDoc} */ @Override public long getMvccCounter(long pageAddr, int idx) { - return CacheCoordinatorsProcessor.COUNTER_NA; + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java index d1e90d4..8853d6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java @@ -50,15 +50,13 @@ public class DataRow extends CacheDataRowAdapter { this.part = part; try { - // We can not init data row lazily because underlying buffer can be concurrently cleared. - initFromLink(grp, rowData); + // We can not init data row lazily outside of entry lock because underlying buffer can be concurrently cleared. + if (rowData != RowData.LINK_ONLY) + initFromLink(grp, rowData); } catch (IgniteCheckedException e) { throw new IgniteException(e); } - - if (key != null) - key.partition(part); } /** @@ -84,11 +82,18 @@ public class DataRow extends CacheDataRowAdapter { /** * */ - protected DataRow() { + DataRow() { super(0); } /** {@inheritDoc} */ + @Override public void key(KeyCacheObject key) { + super.key(key); + + hash = key.hashCode(); + } + + /** {@inheritDoc} */ @Override public int partition() { return part; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java new file mode 100644 index 0000000..92caf70 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.internal.processors.cache.KeyCacheObject; + +/** + * Row contains only link. + */ +public class MvccCleanupRow extends MvccSearchRow { + /** */ + private final long link; + + /** + * @param cacheId Cache ID. + * @param key Key. + * @param crdVer Mvcc coordinator version. + * @param mvccCntr Mvcc counter. + * @param link Link. + */ + MvccCleanupRow(int cacheId, KeyCacheObject key, long crdVer, long mvccCntr, long link) { + super(cacheId, key, crdVer, mvccCntr); + + assert link != 0L; + + this.link = link; + } + + /** {@inheritDoc} */ + @Override public long link() { + return link; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java index 916ea93..a2cf079 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java @@ -18,10 +18,9 @@ package org.apache.ignite.internal.processors.cache.tree; import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.util.typedef.internal.S; -import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid; /** * @@ -34,6 +33,13 @@ public class MvccDataRow extends DataRow { private long mvccCntr; /** + * + */ + private MvccDataRow() { + // No-op. + } + + /** * @param grp Context. * @param hash Key hash. * @param link Link. @@ -42,24 +48,17 @@ public class MvccDataRow extends DataRow { * @param crdVer Mvcc coordinator version. * @param mvccCntr Mvcc counter. */ - MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) { + public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) { super(grp, hash, link, part, rowData); - assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer; - assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA; + assertMvccVersionValid(crdVer, mvccCntr); this.crdVer = crdVer; this.mvccCntr = mvccCntr; } /** - * - */ - private MvccDataRow() { - // No-op. - } - - /** + * @param link Link. * @param part Partition. * @param cacheId Cache ID. * @param crdVer Mvcc coordinator version. @@ -67,12 +66,14 @@ public class MvccDataRow extends DataRow { * @return Row. */ static MvccDataRow removedRowNoKey( + long link, int part, int cacheId, long crdVer, long mvccCntr) { MvccDataRow row = new MvccDataRow(); + row.link = link; row.cacheId = cacheId; row.part = part; row.crdVer = crdVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java index fb2a6cf..0b37a94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; @@ -34,6 +33,7 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue; @@ -51,7 +51,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C private GridLongList activeTxs; /** */ - private List<CacheSearchRow> cleanupRows; + private List<MvccCleanupRow> cleanupRows; /** */ private final MvccCoordinatorVersion mvccVer; @@ -66,7 +66,9 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C * @param key Key. * @param val Value. * @param ver Version. + * @param expireTime Expire time. * @param mvccVer Mvcc version. + * @param needOld {@code True} if need previous value. * @param part Partition. * @param cacheId Cache ID. */ @@ -109,7 +111,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C /** * @return Rows which are safe to cleanup. */ - public List<CacheSearchRow> cleanupRows() { + public List<MvccCleanupRow> cleanupRows() { return cleanupRows; } @@ -175,8 +177,6 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C if (needOld) oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); } - res = versionForRemovedValue(rowCrdVerMasked) ? - UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL; } } @@ -199,26 +199,25 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C int cmp; + long rowCntr = rowIo.getMvccCounter(pageAddr, idx); + if (crdVer == rowCrdVer) - cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx)); + cmp = Long.compare(mvccVer.cleanupVersion(), rowCntr); else cmp = 1; if (cmp >= 0) { // Do not cleanup oldest version. if (canCleanup) { - CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx); - - assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row; + assert assertMvccVersionValid(rowCrdVer, rowCntr); // Should not be possible to cleanup active tx. - assert rowCrdVer != crdVer - || !mvccVer.activeTransactions().contains(row.mvccCounter()); + assert rowCrdVer != crdVer || !mvccVer.activeTransactions().contains(rowCntr); if (cleanupRows == null) cleanupRows = new ArrayList<>(); - cleanupRows.add(row); + cleanupRows.add(new MvccCleanupRow(cacheId, key, rowCrdVerMasked, rowCntr, rowIo.getLink(pageAddr, idx))); } else canCleanup = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java index 5bdc495..5fd7e8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java @@ -83,7 +83,7 @@ public class SearchRow implements CacheSearchRow { /** {@inheritDoc} */ @Override public long mvccCounter() { - return CacheCoordinatorsProcessor.COUNTER_NA; + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index e6300a9..dab2ec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** Version which is less then any version generated on coordinator. */ private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER = - new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L); + new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.MVCC_START_CNTR, 0L); /** Cache receiver. */ private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index b0a3831..5bd4bc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -31,6 +31,7 @@ import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -217,10 +218,13 @@ public interface GridQueryIndexing { * @param cctx Cache context. * @param type Type descriptor. * @param row New row. + * @param newVer Version of new mvcc value inserted for the same key. * @throws IgniteCheckedException If failed. */ - public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row) - throws IgniteCheckedException; + public void store(GridCacheContext cctx, + GridQueryTypeDescriptor type, + CacheDataRow row, + @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException; /** * Removes index entry by key. http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 4886b1b..3b3dec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -1700,14 +1701,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. * @param newRow New row. + * @param mvccVer Mvcc version for update. * @param prevRow Previous row. * @throws IgniteCheckedException In case of error. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - public void store(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow prevRow) - throws IgniteCheckedException { + public void store(GridCacheContext cctx, + CacheDataRow newRow, + @Nullable MvccCoordinatorVersion mvccVer, + @Nullable CacheDataRow prevRow) throws IgniteCheckedException + { assert cctx != null; assert newRow != null; + assert !cctx.mvccEnabled() || mvccVer != null; KeyCacheObject key = newRow.key(); @@ -1734,14 +1740,26 @@ public class GridQueryProcessor extends GridProcessorAdapter { prevRow.value(), false); - if (prevValDesc != null && prevValDesc != desc) + if (prevValDesc != null && prevValDesc != desc) { idx.remove(cctx, prevValDesc, prevRow); + + prevRow = null; + } } if (desc == null) return; - idx.store(cctx, desc, newRow); + if (cctx.mvccEnabled()) { + // Add new mvcc value. + idx.store(cctx, desc, newRow, null); + + // Set info about more recent version for previous record. + if (prevRow != null) + idx.store(cctx, desc, prevRow, mvccVer); + } + else + idx.store(cctx, desc, newRow, null); } finally { busyLock.leaveBusy(); @@ -2304,12 +2322,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. - * @param val Row. + * @param val Value removed from cache. + * @param newVer Mvcc version for remove operation. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void remove(GridCacheContext cctx, CacheDataRow val) + public void remove(GridCacheContext cctx, CacheDataRow val, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { assert val != null; + assert cctx.mvccEnabled() || newVer == null; if (log.isDebugEnabled()) log.debug("Remove [cacheName=" + cctx.name() + ", key=" + val.key()+ ", val=" + val.value() + "]"); @@ -2330,7 +2350,16 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (desc == null) return; - idx.remove(cctx, desc, val); + if (cctx.mvccEnabled()) { + if (newVer != null) { + // Set info about more recent version for previous record. + idx.store(cctx, desc, val, newVer); + } + else + idx.remove(cctx, desc, val); + } + else + idx.remove(cctx, desc, val); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index b0b758a..d77fb81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryIndexing; @@ -310,7 +311,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT } /** {@inheritDoc} */ - @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow val) { + @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row, + @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java index 999144f..1949cd2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -160,6 +161,71 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { } /** + * @param cfgC Optional closure applied to cache configuration. + * @throws Exception If failed. + */ + final void cacheRecreate(@Nullable IgniteInClosure<CacheConfiguration> cfgC) throws Exception { + Ignite srv0 = startGrid(0); + + final int PARTS = 64; + + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS); + + if (cfgC != null) + cfgC.apply(ccfg); + + IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg); + + for (int k = 0; k < PARTS * 2; k++) { + assertNull(cache.get(k)); + + int vals = k % 3 + 1; + + for (int v = 0; v < vals; v++) + cache.put(k, new MvccTestAccount(v, 1)); + + assertEquals(vals - 1, cache.get(k).val); + } + + srv0.destroyCache(cache.getName()); + + ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS); + + if (cfgC != null) + cfgC.apply(ccfg); + + cache = (IgniteCache)srv0.createCache(ccfg); + + for (int k = 0; k < PARTS * 2; k++) { + assertNull(cache.get(k)); + + int vals = k % 3 + 2; + + for (int v = 0; v < vals; v++) + cache.put(k, new MvccTestAccount(v + 100, 1)); + + assertEquals(vals - 1 + 100, cache.get(k).val); + } + + srv0.destroyCache(cache.getName()); + + ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS); + + IgniteCache<Long, Long> cache0 = (IgniteCache)srv0.createCache(ccfg); + + for (long k = 0; k < PARTS * 2; k++) { + assertNull(cache0.get(k)); + + int vals = (int)(k % 3 + 2); + + for (long v = 0; v < vals; v++) + cache0.put(k, v); + + assertEquals((long)(vals - 1), (Object)cache0.get(k)); + } + } + + /** * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -332,13 +398,15 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { Map<Integer, Integer> lastUpdateCntrs = new HashMap<>(); + SqlFieldsQuery sumQry = new SqlFieldsQuery("select sum(val) from MvccTestAccount"); + while (!stop.get()) { while (keys.size() < ACCOUNTS) keys.add(rnd.nextInt(ACCOUNTS)); TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - Map<Integer, MvccTestAccount> accounts; + Map<Integer, MvccTestAccount> accounts = null; try { switch (readMode) { @@ -378,7 +446,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { for (List<?> row : cache.cache.query(qry)) { Integer id = (Integer)row.get(0); - Integer val = (Integer)row.get(0); + Integer val = (Integer)row.get(1); MvccTestAccount old = accounts.put(id, new MvccTestAccount(val, 1)); @@ -389,6 +457,18 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { break; } + case SQL_SUM: { + List<List<?>> res = cache.cache.query(sumQry).getAll(); + + assertEquals(1, res.size()); + + BigDecimal sum = (BigDecimal)res.get(0).get(0); + + assertEquals(ACCOUNT_START_VAL * ACCOUNTS, sum.intValue()); + + break; + } + default: { fail(); @@ -400,29 +480,31 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { cache.readUnlock(); } - if (!withRmvs) - assertEquals(ACCOUNTS, accounts.size()); + if (accounts != null) { + if (!withRmvs) + assertEquals(ACCOUNTS, accounts.size()); - int sum = 0; + int sum = 0; - for (int i = 0; i < ACCOUNTS; i++) { - MvccTestAccount account = accounts.get(i); + for (int i = 0; i < ACCOUNTS; i++) { + MvccTestAccount account = accounts.get(i); - if (account != null) { - sum += account.val; + if (account != null) { + sum += account.val; - Integer cntr = lastUpdateCntrs.get(i); + Integer cntr = lastUpdateCntrs.get(i); - if (cntr != null) - assertTrue(cntr <= account.updateCnt); + if (cntr != null) + assertTrue(cntr <= account.updateCnt); - lastUpdateCntrs.put(i, cntr); + lastUpdateCntrs.put(i, cntr); + } + else + assertTrue(withRmvs); } - else - assertTrue(withRmvs); - } - assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum); + assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum); + } } if (idx == 0) { @@ -713,7 +795,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { * @param node Node. * @throws Exception If failed. */ - final void checkActiveQueriesCleanup(Ignite node) throws Exception { + protected final void checkActiveQueriesCleanup(Ignite node) throws Exception { final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators(); assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition( @@ -827,7 +909,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { SCAN, /** */ - SQL_ALL + SQL_ALL, + + /** */ + SQL_SUM } /**
