ignite-3478 Mvcc support for sql indexes

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6150f3a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6150f3a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6150f3a0

Branch: refs/heads/ignite-3478
Commit: 6150f3a0ad310810606ec5bafbd007804808ff25
Parents: 00bd479
Author: sboikov <[email protected]>
Authored: Wed Oct 25 15:15:56 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Oct 25 15:15:56 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  181 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |   26 +-
 .../cache/mvcc/CoordinatorAckRequestTx.java     |    2 +-
 .../cache/mvcc/PreviousCoordinatorQueries.java  |    4 +-
 .../cache/persistence/CacheDataRowAdapter.java  |    6 +-
 .../cache/persistence/tree/BPlusTree.java       |   42 +-
 .../cache/persistence/tree/io/IOVersions.java   |    7 +
 .../cache/persistence/tree/io/PageIO.java       |   85 +-
 .../cache/query/GridCacheQueryManager.java      |   11 +-
 .../cache/tree/AbstractDataInnerIO.java         |    8 +-
 .../cache/tree/AbstractDataLeafIO.java          |    6 +-
 .../cache/tree/CacheDataRowStore.java           |    6 +-
 .../processors/cache/tree/CacheDataTree.java    |    2 +-
 .../cache/tree/CacheIdAwareDataInnerIO.java     |    2 +-
 .../cache/tree/CacheIdAwareDataLeafIO.java      |    2 +-
 .../processors/cache/tree/DataInnerIO.java      |    2 +-
 .../processors/cache/tree/DataLeafIO.java       |    2 +-
 .../internal/processors/cache/tree/DataRow.java |   17 +-
 .../processors/cache/tree/MvccCleanupRow.java   |   48 +
 .../processors/cache/tree/MvccDataRow.java      |   25 +-
 .../processors/cache/tree/MvccUpdateRow.java    |   23 +-
 .../processors/cache/tree/SearchRow.java        |    2 +-
 .../datastreamer/DataStreamerImpl.java          |    2 +-
 .../processors/query/GridQueryIndexing.java     |    8 +-
 .../processors/query/GridQueryProcessor.java    |   43 +-
 ...IgniteClientCacheInitializationFailTest.java |    4 +-
 .../cache/mvcc/CacheMvccAbstractTest.java       |  123 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |   78 +-
 .../processors/database/BPlusTreeSelfTest.java  |  106 +-
 .../query/h2/opt/GridH2SpatialIndex.java        |    5 +
 .../cache/query/GridCacheTwoStepQuery.java      |   18 +
 .../processors/query/h2/IgniteH2Indexing.java   |   41 +-
 .../query/h2/database/H2PkHashIndex.java        |   11 +-
 .../query/h2/database/H2RowFactory.java         |   30 +-
 .../processors/query/h2/database/H2Tree.java    |  102 +-
 .../query/h2/database/H2TreeIndex.java          |   74 +-
 .../h2/database/H2TreeMvccFilterClosure.java    |  106 ++
 .../h2/database/io/AbstractH2ExtrasInnerIO.java |  190 +++
 .../h2/database/io/AbstractH2ExtrasLeafIO.java  |  187 +++
 .../query/h2/database/io/AbstractH2InnerIO.java |  106 ++
 .../query/h2/database/io/AbstractH2LeafIO.java  |  108 ++
 .../query/h2/database/io/H2ExtrasInnerIO.java   |  115 +-
 .../query/h2/database/io/H2ExtrasLeafIO.java    |  111 +-
 .../query/h2/database/io/H2IOUtils.java         |  113 ++
 .../query/h2/database/io/H2InnerIO.java         |   41 +-
 .../query/h2/database/io/H2LeafIO.java          |   41 +-
 .../h2/database/io/H2MvccExtrasInnerIO.java     |   77 +
 .../h2/database/io/H2MvccExtrasLeafIO.java      |   76 +
 .../query/h2/database/io/H2MvccInnerIO.java     |   42 +
 .../query/h2/database/io/H2MvccLeafIO.java      |   42 +
 .../query/h2/database/io/H2RowLinkIO.java       |   33 +
 .../query/h2/opt/GridH2IndexBase.java           |   27 +-
 .../query/h2/opt/GridH2KeyValueRowOnheap.java   |   30 +-
 .../query/h2/opt/GridH2MetaTable.java           |    5 +
 .../query/h2/opt/GridH2PlainRowFactory.java     |   17 +-
 .../query/h2/opt/GridH2QueryContext.java        |   27 +-
 .../processors/query/h2/opt/GridH2Row.java      |   24 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   12 +-
 .../query/h2/opt/GridH2SearchRow.java           |   41 +
 .../query/h2/opt/GridH2SearchRowAdapter.java    |   13 +-
 .../processors/query/h2/opt/GridH2Table.java    |   53 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   38 +-
 .../h2/twostep/GridMergeIndexIterator.java      |   16 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   46 +-
 .../h2/twostep/msg/GridH2QueryRequest.java      |   83 +-
 .../cache/mvcc/CacheMvccSqlQueriesTest.java     | 1568 +++++++++++++++++-
 66 files changed, 3955 insertions(+), 587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 1280e75..8ce47bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,9 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
 import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -57,6 +55,7 @@ import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccCleanupRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
@@ -88,6 +87,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_START_CNTR;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
@@ -1419,12 +1419,12 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 // TODO IGNITE-3478: null is passed for loaded from store, 
need handle better.
                 if (mvccVer == null) {
-                    mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, 
CacheCoordinatorsProcessor.START_VER, 0L);
+                    mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, 
MVCC_START_CNTR, 0L);
 
                     newVal = true;
                 }
                 else
-                    assert val != null || 
CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+                    assert val != null || 
versionForRemovedValue(mvccVer.coordinatorVersion());
 
                 if (val != null) {
                     val.valueBytes(coCtx);
@@ -1476,8 +1476,12 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert !old;
 
-                if (val != null)
+                if (val != null) {
                     incrementSize(cctx.cacheId());
+
+                    if (cctx.queries().enabled())
+                        cctx.queries().store(updateRow, mvccVer, null);
+                }
             }
             finally {
                 busyLock.leaveBusy();
@@ -1531,6 +1535,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
                     assert !primary : updateRow;
+
+                    cleanup(cctx, updateRow.cleanupRows(), false);
+
+                    return null;
                 }
                 else {
                     rowStore.addRow(updateRow);
@@ -1543,7 +1551,19 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                         incrementSize(cctx.cacheId());
                 }
 
-                cleanup(updateRow.cleanupRows(), false);
+                CacheDataRow oldRow = updateRow.oldRow();
+
+                if (oldRow != null)
+                    oldRow.key(key);
+
+                GridCacheQueryManager qryMgr = cctx.queries();
+
+                if (qryMgr.enabled())
+                    qryMgr.store(updateRow, mvccVer, oldRow);
+
+                updatePendingEntries(cctx, updateRow, oldRow);
+
+                cleanup(cctx, updateRow.cleanupRows(), false);
 
                 return updateRow.activeTransactions();
             }
@@ -1590,18 +1610,20 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
                     assert !primary : updateRow;
 
-                    cleanup(updateRow.cleanupRows(), false);
+                    cleanup(cctx, updateRow.cleanupRows(), false);
+
+                    return null;
                 }
                 else {
                     if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
                         decrementSize(cacheId);
 
-                    CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), 
true);
+                    long rmvRowLink = cleanup(cctx, updateRow.cleanupRows(), 
true);
 
-                    if (rmvRow == null)
+                    if (rmvRowLink == 0)
                         rowStore.addRow(updateRow);
                     else
-                        updateRow.link(rmvRow.link());
+                        updateRow.link(rmvRowLink);
 
                     assert updateRow.link() != 0L;
 
@@ -1610,6 +1632,21 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                     assert !old;
                 }
 
+                CacheDataRow oldRow = updateRow.oldRow();
+
+                if (oldRow != null) {
+                    assert oldRow.link() != 0 : oldRow;
+
+                    oldRow.key(key);
+
+                    GridCacheQueryManager qryMgr = cctx.queries();
+
+                    if (qryMgr.enabled())
+                        qryMgr.remove(key, oldRow, mvccVer);
+
+                    clearPendingEntries(cctx, oldRow);
+                }
+
                 return updateRow.activeTransactions();
             }
             finally {
@@ -1623,26 +1660,40 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
 
+            boolean cleanup = cctx.queries().enabled() || hasPendingEntries;
+
             GridCursor<CacheDataRow> cur = dataTree.find(
                 new MvccSearchRow(cacheId, key, Long.MAX_VALUE, 
Long.MAX_VALUE),
                 new MvccSearchRow(cacheId, key, 1, 1),
-                CacheDataRowAdapter.RowData.KEY_ONLY);
+                cleanup ? CacheDataRowAdapter.RowData.NO_KEY : 
CacheDataRowAdapter.RowData.LINK_ONLY);
 
             boolean first = true;
 
             while (cur.next()) {
                 CacheDataRow row = cur.get();
 
+                row.key(key);
+
                 assert row.link() != 0 : row;
 
                 boolean rmvd = dataTree.removex(row);
 
-                assert rmvd;
+                assert rmvd : row;
+
+                boolean rmvdVal = 
versionForRemovedValue(row.mvccCoordinatorVersion());
+
+                if (cleanup && !rmvdVal) {
+                    if (cctx.queries().enabled())
+                        cctx.queries().remove(key, row, null);
+
+                    if (first)
+                        clearPendingEntries(cctx, row);
+                }
 
                 rowStore.removeRow(row.link());
 
                 if (first) {
-                    if (!versionForRemovedValue(row.mvccCoordinatorVersion()))
+                    if (!rmvdVal)
                         decrementSize(cctx.cacheId());
 
                     first = false;
@@ -1651,36 +1702,48 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /**
+         * @param cctx Cache context.
          * @param cleanupRows Rows to cleanup.
          * @param findRmv {@code True} if need keep removed row entry.
-         * @return Removed row entry if found.
+         * @return Removed row link of {@code 0} if not found.
          * @throws IgniteCheckedException If failed.
          */
-        @Nullable private CacheSearchRow cleanup(@Nullable 
List<CacheSearchRow> cleanupRows, boolean findRmv)
+        private long cleanup(GridCacheContext cctx, @Nullable 
List<MvccCleanupRow> cleanupRows, boolean findRmv)
             throws IgniteCheckedException {
-            CacheSearchRow rmvRow = null;
+            long rmvRowLink = 0;
 
             if (cleanupRows != null) {
+                GridCacheQueryManager qryMgr = cctx.queries();
+
                 for (int i = 0; i < cleanupRows.size(); i++) {
-                    CacheSearchRow oldRow = cleanupRows.get(i);
+                    MvccCleanupRow cleanupRow = cleanupRows.get(i);
+
+                    assert cleanupRow.link() != 0 : cleanupRow;
 
-                    assert oldRow.link() != 0L : oldRow;
+                    if (qryMgr.enabled() && 
!versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) {
+                        CacheDataRow oldRow = dataTree.remove(cleanupRow);
 
-                    boolean rmvd = dataTree.removex(oldRow);
+                        assert oldRow != null : cleanupRow;
 
-                    assert rmvd;
+                        qryMgr.remove(oldRow.key(), oldRow, null);
+                    }
+                    else {
+                        boolean rmvd = dataTree.removex(cleanupRow);
+
+                        assert rmvd;
+                    }
 
                     if (findRmv &&
-                        rmvRow == null &&
-                        
versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
-                        rmvRow = oldRow;
+                        rmvRowLink == 0 &&
+                        
versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) {
+                        rmvRowLink = cleanupRow.link();
                     }
                     else
-                        rowStore.removeRow(oldRow.link());
+                        rowStore.removeRow(cleanupRow.link());
                 }
             }
 
-            return rmvRow;
+            return rmvRowLink;
         }
 
         /** {@inheritDoc} */
@@ -1753,32 +1816,48 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
             KeyCacheObject key = newRow.key();
 
-            long expireTime = newRow.expireTime();
-
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
-
             if (qryMgr.enabled())
-                qryMgr.store(newRow, oldRow);
+                qryMgr.store(newRow, null, oldRow);
+
+            updatePendingEntries(cctx, newRow, oldRow);
 
             if (oldRow != null) {
                 assert oldRow.link() != 0 : oldRow;
 
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
-
                 if (newRow.link() != oldRow.link())
                     rowStore.removeRow(oldRow.link());
             }
 
+            updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : 
null), newRow.value());
+        }
+
+        /**
+         * @param cctx Cache context.
+         * @param newRow
+         * @param oldRow
+         * @throws IgniteCheckedException If failed.
+         */
+        private void updatePendingEntries(GridCacheContext cctx, CacheDataRow 
newRow, @Nullable CacheDataRow oldRow)
+            throws IgniteCheckedException
+        {
+            long expireTime = newRow.expireTime();
+
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
+
+            if (oldRow != null) {
+                assert oldRow.link() != 0 : oldRow;
+
+                if (pendingEntries != null && oldRow.expireTime() != 0)
+                    pendingEntries.removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
+            }
+
             if (pendingEntries != null && expireTime != 0) {
                 pendingEntries.putx(new PendingRow(cacheId, expireTime, 
newRow.link()));
 
                 hasPendingEntries = true;
             }
-
-            updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : 
null), newRow.value());
         }
 
         /** {@inheritDoc} */
@@ -1792,7 +1871,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 GridCacheQueryManager qryMgr = cctx.queries();
 
-                qryMgr.store(row, null);
+                qryMgr.store(row, null, null); // TODO IGNITE-3478.
             }
         }
 
@@ -1821,14 +1900,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
          */
         private void finishRemove(GridCacheContext cctx, KeyCacheObject key, 
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
             if (oldRow != null) {
-                int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
-
-                assert oldRow.link() != 0 : oldRow;
-                assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == 
cacheId :
-                    "Incorrect cache ID [expected=" + cacheId + ", actual=" + 
oldRow.cacheId() + "].";
-
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
+                clearPendingEntries(cctx, oldRow);
 
                 decrementSize(cctx.cacheId());
             }
@@ -1836,7 +1908,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             GridCacheQueryManager qryMgr = cctx.queries();
 
             if (qryMgr.enabled())
-                qryMgr.remove(key, oldRow);
+                qryMgr.remove(key, oldRow, null);
 
             if (oldRow != null)
                 rowStore.removeRow(oldRow.link());
@@ -1844,6 +1916,23 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : 
null), null);
         }
 
+        /**
+         * @param cctx
+         * @param oldRow
+         * @throws IgniteCheckedException
+         */
+        private void clearPendingEntries(GridCacheContext cctx, CacheDataRow 
oldRow)
+            throws IgniteCheckedException {
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
+
+            assert oldRow.link() != 0 : oldRow;
+            assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == 
cacheId :
+                "Incorrect cache ID [expected=" + cacheId + ", actual=" + 
oldRow.cacheId() + "].";
+
+            if (pendingEntries != null && oldRow.expireTime() != 0)
+                pendingEntries.removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
+        }
+
         /** {@inheritDoc} */
         @Override public CacheDataRow find(GridCacheContext cctx, 
KeyCacheObject key) throws IgniteCheckedException {
             key.valueBytes(cctx.cacheObjectContext());
@@ -1985,7 +2074,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                         if (curKey != null && row.key().equals(curKey))
                             continue;
 
-                        if 
(CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+                        if (versionForRemovedValue(rowCrdVerMasked)) {
                             curKey = row.key();
 
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index fd3c2af..07e30d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -75,10 +75,10 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  */
 public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     /** */
-    public static final long COUNTER_NA = 0L;
+    public static final long MVCC_COUNTER_NA = 0L;
 
     /** */
-    public static final long START_VER = 1L;
+    public static final long MVCC_START_CNTR = 1L;
 
     /** */
     private static final boolean STAT_CNTRS = false;
@@ -99,7 +99,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     private volatile MvccCoordinator curCrd;
 
     /** */
-    private final AtomicLong mvccCntr = new AtomicLong(START_VER);
+    private final AtomicLong mvccCntr = new AtomicLong(MVCC_START_CNTR);
 
     /** */
     private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
@@ -148,6 +148,18 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param crdVer Mvcc coordinator version.
+     * @param cntr Counter.
+     * @return Always {@code true}.
+     */
+    public static boolean assertMvccVersionValid(long crdVer, long cntr) {
+        assert unmaskCoordinatorVersion(crdVer) > 0;
+        assert cntr != MVCC_COUNTER_NA;
+
+        return true;
+    }
+
+    /**
      * @param crdVer Coordinator version.
      * @return Coordinator version with removed value flag.
      */
@@ -651,7 +663,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     private void processCoordinatorTxAckRequest(UUID nodeId, 
CoordinatorAckRequestTx msg) {
         onTxDone(msg.txCounter());
 
-        if (msg.queryCounter() != COUNTER_NA) {
+        if (msg.queryCounter() != MVCC_COUNTER_NA) {
             if (msg.queryCoordinatorVersion() == 0)
                 onQueryDone(nodeId, msg.queryCounter());
             else
@@ -824,7 +836,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
             else
                 qryCnt.incrementAndGet();
 
-            res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+            res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA);
 
             return res;
         }
@@ -909,7 +921,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
 //            }
 //        }
 //
-//        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+//        res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA);
 //
 //        return res;
     }
@@ -1197,7 +1209,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
          * @param res Response.
          */
         void onResponse(MvccCoordinatorVersionResponse res) {
-            assert res.counter() != COUNTER_NA;
+            assert res.counter() != MVCC_COUNTER_NA;
 
             if (lsnr != null)
                 lsnr.onMvccResponse(crd.nodeId(), res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
index c0512f0..5ab3d3b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -60,7 +60,7 @@ public class CoordinatorAckRequestTx implements 
MvccCoordinatorMessage {
 
     /** {@inheritDoc} */
     long queryCounter() {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 5c56f40..521e989 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -26,9 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -165,7 +163,7 @@ class PreviousCoordinatorQueries {
      */
     void onQueryDone(UUID nodeId, long crdVer, long cntr) {
         assert crdVer != 0;
-        assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+        assert cntr != CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 
         synchronized (this) {
             MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 1e3a229..29bb6bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -60,6 +60,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
     protected CacheObject val;
 
     /** */
+    @GridToStringInclude
     protected long expireTime = -1;
 
     /** */
@@ -599,7 +600,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
         KEY_ONLY,
 
         /** */
-        NO_KEY
+        NO_KEY,
+
+        /** */
+        LINK_ONLY,
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index b31a61f..1ebb1e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -884,12 +884,13 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
     /**
      * @param upper Upper bound.
+     * @param c Filter closure.
      * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    private GridCursor<T> findLowerUnbounded(L upper, Object x) throws 
IgniteCheckedException {
-        ForwardCursor cursor = new ForwardCursor(null, upper, x);
+    private GridCursor<T> findLowerUnbounded(L upper, TreeRowClosure<L, T> c, 
Object x) throws IgniteCheckedException {
+        ForwardCursor cursor = new ForwardCursor(null, upper, c, x);
 
         long firstPageId;
 
@@ -946,13 +947,25 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * @throws IgniteCheckedException If failed.
      */
     public final GridCursor<T> find(L lower, L upper, Object x) throws 
IgniteCheckedException {
+        return find(lower, upper, null, x);
+    }
+
+    /**
+     * @param lower Lower bound inclusive or {@code null} if unbounded.
+     * @param upper Upper bound inclusive or {@code null} if unbounded.
+     * @param c Filter closure.
+     * @param x Implementation specific argument, {@code null} always means 
that we need to return full detached data row.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final GridCursor<T> find(L lower, L upper, TreeRowClosure<L, T> c, 
Object x) throws IgniteCheckedException {
         checkDestroyed();
 
         try {
             if (lower == null)
-                return findLowerUnbounded(upper, x);
+                return findLowerUnbounded(upper, c, x);
 
-            ForwardCursor cursor = new ForwardCursor(lower, upper, x);
+            ForwardCursor cursor = new ForwardCursor(lower, upper, c, x);
 
             cursor.find();
 
@@ -4751,14 +4764,19 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         /** */
         private int row = -1;
 
+        /** */
+        private final TreeRowClosure<L, T> c;
+
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
+         * @param c Filter closure.
          * @param x Implementation specific argument, {@code null} always 
means that we need to return full detached data row.
          */
-        ForwardCursor(L lowerBound, L upperBound, Object x) {
+        ForwardCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> c, 
Object x) {
             super(lowerBound, upperBound);
 
+            this.c = c;
             this.x = x;
         }
 
@@ -4782,15 +4800,21 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             if (rows == EMPTY)
                 rows = (T[])new Object[cnt];
 
+            int resCnt = 0;
+
             for (int i = 0; i < cnt; i++) {
-                T r = getRow(io, pageAddr, startIdx + i, x);
+                int itemIdx = startIdx + i;
 
-                rows = GridArrays.set(rows, i, r);
+                if (c == null || c.apply(BPlusTree.this, io, pageAddr, 
itemIdx)) {
+                    T r = getRow(io, pageAddr, itemIdx, x);
+
+                    rows = GridArrays.set(rows, resCnt++, r);
+                }
             }
 
-            GridArrays.clearTail(rows, cnt);
+            GridArrays.clearTail(rows, resCnt);
 
-            return true;
+            return resCnt > 0;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
index d74d344..9dcad9b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.tree.io;
 
+import org.apache.ignite.internal.util.typedef.internal.S;
+
 /**
  * Registry for IO versions.
  */
@@ -99,4 +101,9 @@ public final class IOVersions<V extends PageIO> {
 
         return res;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IOVersions.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index 2de0b8c..0a42129 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -88,6 +88,12 @@ public abstract class PageIO {
     /** */
     private static IOVersions<? extends BPlusLeafIO<?>> h2LeafIOs;
 
+    /** */
+    private static IOVersions<? extends BPlusInnerIO<?>> h2MvccInnerIOs;
+
+    /** */
+    private static IOVersions<? extends BPlusLeafIO<?>> h2MvccLeafIOs;
+
     /** Maximum payload size. */
     public static final short MAX_PAYLOAD_SIZE = 2048;
 
@@ -98,6 +104,12 @@ public abstract class PageIO {
     private static List<IOVersions<? extends BPlusLeafIO<?>>> h2ExtraLeafIOs = 
new ArrayList<>(MAX_PAYLOAD_SIZE);
 
     /** */
+    private static List<IOVersions<? extends BPlusInnerIO<?>>> 
h2ExtraMvccInnerIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
+
+    /** */
+    private static List<IOVersions<? extends BPlusLeafIO<?>>> 
h2ExtraMvccLeafIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
+
+    /** */
     public static final int TYPE_OFF = 0;
 
     /** */
@@ -184,24 +196,42 @@ public abstract class PageIO {
     public static final short T_PART_CNTRS = 20;
 
     /** Index for payload == 1. */
-    public static final short T_H2_EX_REF_LEAF_START = 10000;
+    public static final short T_H2_EX_REF_LEAF_START = 10_000;
 
     /** */
     public static final short T_H2_EX_REF_LEAF_END = T_H2_EX_REF_LEAF_START + 
MAX_PAYLOAD_SIZE - 1;
 
     /** */
-    public static final short T_H2_EX_REF_INNER_START = 20000;
+    public static final short T_H2_EX_REF_INNER_START = 20_000;
 
     /** */
     public static final short T_H2_EX_REF_INNER_END = T_H2_EX_REF_INNER_START 
+ MAX_PAYLOAD_SIZE - 1;
 
     /** */
+    public static final short T_H2_EX_REF_MVCC_LEAF_START = 23_000;
+
+    /** */
+    public static final short T_H2_EX_REF_MVCC_LEAF_END = 
T_H2_EX_REF_MVCC_LEAF_START + MAX_PAYLOAD_SIZE - 1;
+
+    /** */
+    public static final short T_H2_EX_REF_MVCC_INNER_START = 26_000;
+
+    /** */
+    public static final short T_H2_EX_REF_MVCC_INNER_END = 
T_H2_EX_REF_MVCC_INNER_START + MAX_PAYLOAD_SIZE - 1;
+
+    /** */
     public static final short T_DATA_REF_MVCC_INNER = 21;
 
     /** */
     public static final short T_DATA_REF_MVCC_LEAF = 22;
 
     /** */
+    public static final short T_H2_MVCC_REF_LEAF = 23;
+
+    /** */
+    public static final short T_H2_MVCC_REF_INNER = 24;
+
+    /** */
     private final int ver;
 
     /** */
@@ -334,13 +364,19 @@ public abstract class PageIO {
      *
      * @param innerIOs Inner IO versions.
      * @param leafIOs Leaf IO versions.
+     * @param mvccInnerIOs Inner IO versions with mvcc enabled.
+     * @param mvccLeafIOs Leaf IO versions with mvcc enabled.
      */
     public static void registerH2(
         IOVersions<? extends BPlusInnerIO<?>> innerIOs,
-        IOVersions<? extends BPlusLeafIO<?>> leafIOs
+        IOVersions<? extends BPlusLeafIO<?>> leafIOs,
+        IOVersions<? extends BPlusInnerIO<?>> mvccInnerIOs,
+        IOVersions<? extends BPlusLeafIO<?>> mvccLeafIOs
     ) {
         h2InnerIOs = innerIOs;
         h2LeafIOs = leafIOs;
+        h2MvccInnerIOs = mvccInnerIOs;
+        h2MvccLeafIOs = mvccLeafIOs;
     }
 
     /**
@@ -348,8 +384,10 @@ public abstract class PageIO {
      *
      * @param innerExtIOs Extra versions.
      */
-    public static void registerH2ExtraInner(IOVersions<? extends 
BPlusInnerIO<?>> innerExtIOs) {
-        h2ExtraInnerIOs.add(innerExtIOs);
+    public static void registerH2ExtraInner(IOVersions<? extends 
BPlusInnerIO<?>> innerExtIOs, boolean mvcc) {
+        List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? 
h2ExtraMvccInnerIOs : h2ExtraInnerIOs;
+
+        ios.add(innerExtIOs);
     }
 
     /**
@@ -357,24 +395,30 @@ public abstract class PageIO {
      *
      * @param leafExtIOs Extra versions.
      */
-    public static void registerH2ExtraLeaf(IOVersions<? extends 
BPlusLeafIO<?>> leafExtIOs) {
-        h2ExtraLeafIOs.add(leafExtIOs);
+    public static void registerH2ExtraLeaf(IOVersions<? extends 
BPlusLeafIO<?>> leafExtIOs, boolean mvcc) {
+        List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? 
h2ExtraMvccLeafIOs : h2ExtraLeafIOs;
+
+        ios.add(leafExtIOs);
     }
 
     /**
      * @param idx Index.
      * @return IOVersions for given idx.
      */
-    public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int 
idx) {
-        return h2ExtraInnerIOs.get(idx);
+    public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int 
idx, boolean mvcc) {
+        List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? 
h2ExtraMvccInnerIOs : h2ExtraInnerIOs;
+
+        return ios.get(idx);
     }
 
     /**
      * @param idx Index.
      * @return IOVersions for given idx.
      */
-    public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int 
idx) {
-        return h2ExtraLeafIOs.get(idx);
+    public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int 
idx, boolean mvcc) {
+        List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? 
h2ExtraMvccLeafIOs : h2ExtraLeafIOs;
+
+        return ios.get(idx);
     }
 
     /**
@@ -493,13 +537,18 @@ public abstract class PageIO {
      */
     @SuppressWarnings("unchecked")
     public static <Q extends BPlusIO<?>> Q getBPlusIO(int type, int ver) 
throws IgniteCheckedException {
-
         if (type >= T_H2_EX_REF_LEAF_START && type <= T_H2_EX_REF_LEAF_END)
             return (Q)h2ExtraLeafIOs.get(type - 
T_H2_EX_REF_LEAF_START).forVersion(ver);
 
         if (type >= T_H2_EX_REF_INNER_START && type <= T_H2_EX_REF_INNER_END)
             return (Q)h2ExtraInnerIOs.get(type - 
T_H2_EX_REF_INNER_START).forVersion(ver);
 
+        if (type >= T_H2_EX_REF_MVCC_LEAF_START && type <= 
T_H2_EX_REF_MVCC_LEAF_END)
+            return (Q)h2ExtraMvccLeafIOs.get(type - 
T_H2_EX_REF_MVCC_LEAF_START).forVersion(ver);
+
+        if (type >= T_H2_EX_REF_MVCC_INNER_START && type <= 
T_H2_EX_REF_MVCC_INNER_END)
+            return (Q)h2ExtraMvccInnerIOs.get(type - 
T_H2_EX_REF_MVCC_INNER_START).forVersion(ver);
+
         switch (type) {
             case T_H2_REF_INNER:
                 if (h2InnerIOs == null)
@@ -513,6 +562,18 @@ public abstract class PageIO {
 
                 return (Q)h2LeafIOs.forVersion(ver);
 
+            case T_H2_MVCC_REF_INNER:
+                if (h2MvccInnerIOs == null)
+                    break;
+
+                return (Q)h2MvccInnerIOs.forVersion(ver);
+
+            case T_H2_MVCC_REF_LEAF:
+                if (h2MvccLeafIOs == null)
+                    break;
+
+                return (Q)h2MvccLeafIOs.forVersion(ver);
+
             case T_DATA_REF_INNER:
                 return (Q)DataInnerIO.VERSIONS.forVersion(ver);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 59b7613..fb5728a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -382,10 +382,11 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
     /**
      * @param newRow New row.
+     * @param mvccVer Mvcc version for update.
      * @param prevRow Previous row.
      * @throws IgniteCheckedException In case of error.
      */
-    public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow)
+    public void store(CacheDataRow newRow, @Nullable MvccCoordinatorVersion 
mvccVer, @Nullable CacheDataRow prevRow)
         throws IgniteCheckedException {
         assert enabled();
         assert newRow != null && newRow.value() != null && newRow.link() != 0 
: newRow;
@@ -405,7 +406,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             }
 
             if (qryProcEnabled)
-                qryProc.store(cctx, newRow, prevRow);
+                qryProc.store(cctx, newRow, mvccVer, prevRow);
         }
         finally {
             invalidateResultCache();
@@ -417,9 +418,11 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
     /**
      * @param key Key.
      * @param prevRow Previous row.
+     * @param newVer Mvcc version for remove operation.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) 
throws IgniteCheckedException {
+    public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow, 
@Nullable MvccCoordinatorVersion newVer)
+        throws IgniteCheckedException {
         if (!QueryUtils.isEnabled(cctx.config()))
             return; // No-op.
 
@@ -435,7 +438,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
             // val may be null if we have no previous value. We should not 
call processor in this case.
             if (qryProcEnabled && prevRow != null)
-                qryProc.remove(cctx, prevRow);
+                qryProc.remove(cctx, prevRow, newVer);
         }
         finally {
             invalidateResultCache();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 31aa2ca..c36d5cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -26,7 +26,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
-import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 
 /**
@@ -62,7 +62,7 @@ public abstract class AbstractDataInnerIO extends 
BPlusInnerIO<CacheSearchRow> i
 
         if (storeMvccVersion()) {
             assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 
: row;
-            assert row.mvccCounter() != COUNTER_NA : row;
+            assert row.mvccCounter() != MVCC_COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
             off += 8;
@@ -82,7 +82,7 @@ public abstract class AbstractDataInnerIO extends 
BPlusInnerIO<CacheSearchRow> i
             long mvccCntr = getMvccCounter(pageAddr, idx);
 
             assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
-            assert mvccCntr != COUNTER_NA;
+            assert mvccCntr != MVCC_COUNTER_NA;
 
             return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
                 hash,
@@ -128,7 +128,7 @@ public abstract class AbstractDataInnerIO extends 
BPlusInnerIO<CacheSearchRow> i
             long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
 
             assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
-            assert mvccCntr != COUNTER_NA;
+            assert mvccCntr != MVCC_COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index 47d8a6f..d60aef2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -26,7 +26,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
-import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 
 /**
@@ -64,7 +64,7 @@ public abstract class AbstractDataLeafIO extends 
BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = row.mvccCounter();
 
             assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
-            assert mvccUpdateCntr != COUNTER_NA;
+            assert mvccUpdateCntr != MVCC_COUNTER_NA;
 
             PageUtils.putLong(pageAddr, off, mvccCrdVer);
             off += 8;
@@ -100,7 +100,7 @@ public abstract class AbstractDataLeafIO extends 
BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = 
((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
 
             assert unmaskCoordinatorVersion(mvccUpdateTopVer) > 0 : 
mvccUpdateCntr;
-            assert mvccUpdateCntr != COUNTER_NA;
+            assert mvccUpdateCntr != MVCC_COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index 85624d5..5537794 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -73,9 +73,9 @@ public class CacheDataRowStore extends RowStore {
      * @return Search row.
      */
     MvccDataRow mvccRow(int cacheId, int hash, long link, 
CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
-        if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && 
versionForRemovedValue(crdVer)) {
-            if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
-                return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, 
mvccCntr);
+        if (versionForRemovedValue(crdVer)) {
+            if (rowData == CacheDataRowAdapter.RowData.NO_KEY || rowData == 
CacheDataRowAdapter.RowData.LINK_ONLY)
+                return MvccDataRow.removedRowNoKey(link, partId, cacheId, 
crdVer, mvccCntr);
             else
                 rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a699cd3..9f85640 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -169,7 +169,7 @@ public class CacheDataTree extends 
BPlusTree<CacheSearchRow, CacheDataRow> {
 
         long mvccCntr = io.getMvccCounter(pageAddr, idx);
 
-        assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA;
+        assert row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 
         cmp = Long.compare(row.mvccCounter(), mvccCntr);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index 3d02b27..36ffd49 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataInnerIO extends 
AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index 58ae9ff..ae6fc0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataLeafIO extends 
AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index 19a5c47..98a5450 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -59,6 +59,6 @@ public final class DataInnerIO extends AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index ab10b96..b644e6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -59,6 +59,6 @@ public final class DataLeafIO extends AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index d1e90d4..8853d6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -50,15 +50,13 @@ public class DataRow extends CacheDataRowAdapter {
         this.part = part;
 
         try {
-            // We can not init data row lazily because underlying buffer can 
be concurrently cleared.
-            initFromLink(grp, rowData);
+            // We can not init data row lazily outside of entry lock because 
underlying buffer can be concurrently cleared.
+            if (rowData != RowData.LINK_ONLY)
+                initFromLink(grp, rowData);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
         }
-
-        if (key != null)
-            key.partition(part);
     }
 
     /**
@@ -84,11 +82,18 @@ public class DataRow extends CacheDataRowAdapter {
     /**
      *
      */
-    protected DataRow() {
+    DataRow() {
         super(0);
     }
 
     /** {@inheritDoc} */
+    @Override public void key(KeyCacheObject key) {
+        super.key(key);
+
+        hash = key.hashCode();
+    }
+
+    /** {@inheritDoc} */
     @Override public int partition() {
         return part;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
new file mode 100644
index 0000000..92caf70
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+
+/**
+ * Row contains only link.
+ */
+public class MvccCleanupRow extends MvccSearchRow {
+    /** */
+    private final long link;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param crdVer Mvcc coordinator version.
+     * @param mvccCntr Mvcc counter.
+     * @param link Link.
+     */
+    MvccCleanupRow(int cacheId, KeyCacheObject key, long crdVer, long 
mvccCntr, long link) {
+        super(cacheId, key, crdVer, mvccCntr);
+
+        assert link != 0L;
+
+        this.link = link;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long link() {
+        return link;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index 916ea93..a2cf079 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,10 +18,9 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
-import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
 
 /**
  *
@@ -34,6 +33,13 @@ public class MvccDataRow extends DataRow {
     private long mvccCntr;
 
     /**
+     *
+     */
+    private MvccDataRow() {
+        // No-op.
+    }
+
+    /**
      * @param grp Context.
      * @param hash Key hash.
      * @param link Link.
@@ -42,24 +48,17 @@ public class MvccDataRow extends DataRow {
      * @param crdVer Mvcc coordinator version.
      * @param mvccCntr Mvcc counter.
      */
-    MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData 
rowData, long crdVer, long mvccCntr) {
+    public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, 
RowData rowData, long crdVer, long mvccCntr) {
         super(grp, hash, link, part, rowData);
 
-        assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
-        assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+        assertMvccVersionValid(crdVer, mvccCntr);
 
         this.crdVer = crdVer;
         this.mvccCntr = mvccCntr;
     }
 
     /**
-     *
-     */
-    private MvccDataRow() {
-        // No-op.
-    }
-
-    /**
+     * @param link Link.
      * @param part Partition.
      * @param cacheId Cache ID.
      * @param crdVer Mvcc coordinator version.
@@ -67,12 +66,14 @@ public class MvccDataRow extends DataRow {
      * @return Row.
      */
     static MvccDataRow removedRowNoKey(
+        long link,
         int part,
         int cacheId,
         long crdVer,
         long mvccCntr) {
         MvccDataRow row = new MvccDataRow();
 
+        row.link = link;
         row.cacheId = cacheId;
         row.part = part;
         row.crdVer = crdVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index fb2a6cf..0b37a94 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
@@ -34,6 +33,7 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
@@ -51,7 +51,7 @@ public class MvccUpdateRow extends DataRow implements 
BPlusTree.TreeRowClosure<C
     private GridLongList activeTxs;
 
     /** */
-    private List<CacheSearchRow> cleanupRows;
+    private List<MvccCleanupRow> cleanupRows;
 
     /** */
     private final MvccCoordinatorVersion mvccVer;
@@ -66,7 +66,9 @@ public class MvccUpdateRow extends DataRow implements 
BPlusTree.TreeRowClosure<C
      * @param key Key.
      * @param val Value.
      * @param ver Version.
+     * @param expireTime Expire time.
      * @param mvccVer Mvcc version.
+     * @param needOld {@code True} if need previous value.
      * @param part Partition.
      * @param cacheId Cache ID.
      */
@@ -109,7 +111,7 @@ public class MvccUpdateRow extends DataRow implements 
BPlusTree.TreeRowClosure<C
     /**
      * @return Rows which are safe to cleanup.
      */
-    public List<CacheSearchRow> cleanupRows() {
+    public List<MvccCleanupRow> cleanupRows() {
         return cleanupRows;
     }
 
@@ -175,8 +177,6 @@ public class MvccUpdateRow extends DataRow implements 
BPlusTree.TreeRowClosure<C
                     if (needOld)
                         oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, 
idx, CacheDataRowAdapter.RowData.NO_KEY);
                 }
-                res = versionForRemovedValue(rowCrdVerMasked) ?
-                    UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
             }
         }
 
@@ -199,26 +199,25 @@ public class MvccUpdateRow extends DataRow implements 
BPlusTree.TreeRowClosure<C
 
             int cmp;
 
+            long rowCntr = rowIo.getMvccCounter(pageAddr, idx);
+
             if (crdVer == rowCrdVer)
-                cmp = Long.compare(mvccVer.cleanupVersion(), 
rowIo.getMvccCounter(pageAddr, idx));
+                cmp = Long.compare(mvccVer.cleanupVersion(), rowCntr);
             else
                 cmp = 1;
 
             if (cmp >= 0) {
                 // Do not cleanup oldest version.
                 if (canCleanup) {
-                    CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
-
-                    assert row.link() != 0 && row.mvccCounter() != 
CacheCoordinatorsProcessor.COUNTER_NA : row;
+                    assert assertMvccVersionValid(rowCrdVer, rowCntr);
 
                     // Should not be possible to cleanup active tx.
-                    assert rowCrdVer != crdVer
-                        || 
!mvccVer.activeTransactions().contains(row.mvccCounter());
+                    assert rowCrdVer != crdVer || 
!mvccVer.activeTransactions().contains(rowCntr);
 
                     if (cleanupRows == null)
                         cleanupRows = new ArrayList<>();
 
-                    cleanupRows.add(row);
+                    cleanupRows.add(new MvccCleanupRow(cacheId, key, 
rowCrdVerMasked, rowCntr, rowIo.getLink(pageAddr, idx)));
                 }
                 else
                     canCleanup = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
index 5bdc495..5fd7e8f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
@@ -83,7 +83,7 @@ public class SearchRow implements CacheSearchRow {
 
     /** {@inheritDoc} */
     @Override public long mvccCounter() {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index e6300a9..dab2ec0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
     /** Version which is less then any version generated on coordinator. */
     private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
-        new MvccCoordinatorVersionWithoutTxs(1L, 
CacheCoordinatorsProcessor.START_VER, 0L);
+        new MvccCoordinatorVersionWithoutTxs(1L, 
CacheCoordinatorsProcessor.MVCC_START_CNTR, 0L);
 
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index b0a3831..5bd4bc8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -217,10 +218,13 @@ public interface GridQueryIndexing {
      * @param cctx Cache context.
      * @param type Type descriptor.
      * @param row New row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @throws IgniteCheckedException If failed.
      */
-    public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, 
CacheDataRow row)
-        throws IgniteCheckedException;
+    public void store(GridCacheContext cctx,
+        GridQueryTypeDescriptor type,
+        CacheDataRow row,
+        @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException;
 
     /**
      * Removes index entry by key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 4886b1b..3b3dec0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -1700,14 +1701,19 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /**
      * @param cctx Cache context.
      * @param newRow New row.
+     * @param mvccVer Mvcc version for update.
      * @param prevRow Previous row.
      * @throws IgniteCheckedException In case of error.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    public void store(GridCacheContext cctx, CacheDataRow newRow, @Nullable 
CacheDataRow prevRow)
-        throws IgniteCheckedException {
+    public void store(GridCacheContext cctx,
+        CacheDataRow newRow,
+        @Nullable MvccCoordinatorVersion mvccVer,
+        @Nullable CacheDataRow prevRow) throws IgniteCheckedException
+    {
         assert cctx != null;
         assert newRow != null;
+        assert !cctx.mvccEnabled() || mvccVer != null;
 
         KeyCacheObject key = newRow.key();
 
@@ -1734,14 +1740,26 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                     prevRow.value(),
                     false);
 
-                if (prevValDesc != null && prevValDesc != desc)
+                if (prevValDesc != null && prevValDesc != desc) {
                     idx.remove(cctx, prevValDesc, prevRow);
+
+                    prevRow = null;
+                }
             }
 
             if (desc == null)
                 return;
 
-            idx.store(cctx, desc, newRow);
+            if (cctx.mvccEnabled()) {
+                // Add new mvcc value.
+                idx.store(cctx, desc, newRow, null);
+
+                // Set info about more recent version for previous record.
+                if (prevRow != null)
+                    idx.store(cctx, desc, prevRow, mvccVer);
+            }
+            else
+                idx.store(cctx, desc, newRow, null);
         }
         finally {
             busyLock.leaveBusy();
@@ -2304,12 +2322,14 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param cctx Cache context.
-     * @param val Row.
+     * @param val Value removed from cache.
+     * @param newVer Mvcc version for remove operation.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void remove(GridCacheContext cctx, CacheDataRow val)
+    public void remove(GridCacheContext cctx, CacheDataRow val, @Nullable 
MvccCoordinatorVersion newVer)
         throws IgniteCheckedException {
         assert val != null;
+        assert cctx.mvccEnabled() || newVer == null;
 
         if (log.isDebugEnabled())
             log.debug("Remove [cacheName=" + cctx.name() + ", key=" + 
val.key()+ ", val=" + val.value() + "]");
@@ -2330,7 +2350,16 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             if (desc == null)
                 return;
 
-            idx.remove(cctx, desc, val);
+            if (cctx.mvccEnabled()) {
+                if (newVer != null) {
+                    // Set info about more recent version for previous record.
+                    idx.store(cctx, desc, val, newVer);
+                }
+                else
+                    idx.remove(cctx, desc, val);
+            }
+            else
+                idx.remove(cctx, desc, val);
         }
         finally {
             busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index b0b758a..d77fb81 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
@@ -310,7 +311,8 @@ public class IgniteClientCacheInitializationFailTest 
extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public void store(GridCacheContext cctx, 
GridQueryTypeDescriptor type, CacheDataRow val) {
+        @Override public void store(GridCacheContext cctx, 
GridQueryTypeDescriptor type, CacheDataRow row,
+            @Nullable MvccCoordinatorVersion newVer) throws 
IgniteCheckedException {
             // No-op.
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 999144f..1949cd2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -160,6 +161,71 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param cfgC Optional closure applied to cache configuration.
+     * @throws Exception If failed.
+     */
+    final void cacheRecreate(@Nullable IgniteInClosure<CacheConfiguration> 
cfgC) throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        final int PARTS = 64;
+
+        CacheConfiguration<Object, Object> ccfg = 
cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+        if (cfgC != null)
+            cfgC.apply(ccfg);
+
+        IgniteCache<Integer, MvccTestAccount> cache = 
(IgniteCache)srv0.createCache(ccfg);
+
+        for (int k = 0; k < PARTS * 2; k++) {
+            assertNull(cache.get(k));
+
+            int vals = k % 3 + 1;
+
+            for (int v = 0; v < vals; v++)
+                cache.put(k, new MvccTestAccount(v, 1));
+
+            assertEquals(vals - 1, cache.get(k).val);
+        }
+
+        srv0.destroyCache(cache.getName());
+
+        ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+        if (cfgC != null)
+            cfgC.apply(ccfg);
+
+        cache = (IgniteCache)srv0.createCache(ccfg);
+
+        for (int k = 0; k < PARTS * 2; k++) {
+            assertNull(cache.get(k));
+
+            int vals = k % 3 + 2;
+
+            for (int v = 0; v < vals; v++)
+                cache.put(k, new MvccTestAccount(v + 100, 1));
+
+            assertEquals(vals - 1 + 100, cache.get(k).val);
+        }
+
+        srv0.destroyCache(cache.getName());
+
+        ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+        IgniteCache<Long, Long> cache0 = (IgniteCache)srv0.createCache(ccfg);
+
+        for (long k = 0; k < PARTS * 2; k++) {
+            assertNull(cache0.get(k));
+
+            int vals = (int)(k % 3 + 2);
+
+            for (long v = 0; v < vals; v++)
+                cache0.put(k, v);
+
+            assertEquals((long)(vals - 1), (Object)cache0.get(k));
+        }
+    }
+
+    /**
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -332,13 +398,15 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
 
                     Map<Integer, Integer> lastUpdateCntrs = new HashMap<>();
 
+                    SqlFieldsQuery sumQry = new SqlFieldsQuery("select 
sum(val) from MvccTestAccount");
+
                     while (!stop.get()) {
                         while (keys.size() < ACCOUNTS)
                             keys.add(rnd.nextInt(ACCOUNTS));
 
                         TestCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
 
-                        Map<Integer, MvccTestAccount> accounts;
+                        Map<Integer, MvccTestAccount> accounts = null;
 
                         try {
                             switch (readMode) {
@@ -378,7 +446,7 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
 
                                         for (List<?> row : 
cache.cache.query(qry)) {
                                             Integer id = (Integer)row.get(0);
-                                            Integer val = (Integer)row.get(0);
+                                            Integer val = (Integer)row.get(1);
 
                                             MvccTestAccount old = 
accounts.put(id, new MvccTestAccount(val, 1));
 
@@ -389,6 +457,18 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
                                     break;
                                 }
 
+                                case SQL_SUM: {
+                                    List<List<?>> res = 
cache.cache.query(sumQry).getAll();
+
+                                    assertEquals(1, res.size());
+
+                                    BigDecimal sum = 
(BigDecimal)res.get(0).get(0);
+
+                                    assertEquals(ACCOUNT_START_VAL * ACCOUNTS, 
sum.intValue());
+
+                                    break;
+                                }
+
                                 default: {
                                     fail();
 
@@ -400,29 +480,31 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
                             cache.readUnlock();
                         }
 
-                        if (!withRmvs)
-                            assertEquals(ACCOUNTS, accounts.size());
+                        if (accounts != null) {
+                            if (!withRmvs)
+                                assertEquals(ACCOUNTS, accounts.size());
 
-                        int sum = 0;
+                            int sum = 0;
 
-                        for (int i = 0; i < ACCOUNTS; i++) {
-                            MvccTestAccount account = accounts.get(i);
+                            for (int i = 0; i < ACCOUNTS; i++) {
+                                MvccTestAccount account = accounts.get(i);
 
-                            if (account != null) {
-                                sum += account.val;
+                                if (account != null) {
+                                    sum += account.val;
 
-                                Integer cntr = lastUpdateCntrs.get(i);
+                                    Integer cntr = lastUpdateCntrs.get(i);
 
-                                if (cntr != null)
-                                    assertTrue(cntr <= account.updateCnt);
+                                    if (cntr != null)
+                                        assertTrue(cntr <= account.updateCnt);
 
-                                lastUpdateCntrs.put(i, cntr);
+                                    lastUpdateCntrs.put(i, cntr);
+                                }
+                                else
+                                    assertTrue(withRmvs);
                             }
-                            else
-                                assertTrue(withRmvs);
-                        }
 
-                        assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                            assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                        }
                     }
 
                     if (idx == 0) {
@@ -713,7 +795,7 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
      * @param node Node.
      * @throws Exception If failed.
      */
-    final void checkActiveQueriesCleanup(Ignite node) throws Exception {
+    protected final void checkActiveQueriesCleanup(Ignite node) throws 
Exception {
         final CacheCoordinatorsProcessor crd = 
((IgniteKernal)node).context().cache().context().coordinators();
 
         assertTrue("Active queries not cleared: " + node.name(), 
GridTestUtils.waitForCondition(
@@ -827,7 +909,10 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
         SCAN,
 
         /** */
-        SQL_ALL
+        SQL_ALL,
+
+        /** */
+        SQL_SUM
     }
 
     /**

Reply via email to