ignite-3478 Support for optimistic transactions

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3f33d6a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3f33d6a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3f33d6a5

Branch: refs/heads/ignite-3478
Commit: 3f33d6a5d2e5be3b52f32a6746129decd736f0af
Parents: 4c06131
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Oct 16 14:50:32 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Oct 16 14:50:32 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   34 +-
 .../processors/cache/GridCacheEntryInfo.java    |   10 +-
 .../processors/cache/GridCacheMapEntry.java     |   35 +-
 .../cache/GridCacheMvccEntryInfo.java           |    3 +
 .../cache/GridCacheSharedContext.java           |    3 +
 .../cache/IgniteCacheOffheapManager.java        |    6 +
 .../cache/IgniteCacheOffheapManagerImpl.java    |   94 +-
 .../GridDistributedTxRemoteAdapter.java         |    8 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   12 +-
 .../distributed/dht/GridDhtGetSingleFuture.java |   13 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   33 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   10 +-
 .../dht/GridPartitionedGetFuture.java           |   34 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   26 +-
 .../dht/atomic/GridDhtAtomicCache.java          |    6 +-
 .../dht/colocated/GridDhtColocatedCache.java    |   30 +-
 .../GridDhtPartitionsExchangeFuture.java        |   59 +-
 .../distributed/near/GridNearGetRequest.java    |    8 +-
 ...arOptimisticSerializableTxPrepareFuture.java |   71 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   60 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |  126 ++
 .../GridNearPessimisticTxPrepareFuture.java     |   18 +-
 .../near/GridNearSingleGetRequest.java          |   41 +-
 .../near/GridNearTxFinishAndAckFuture.java      |   16 +-
 .../near/GridNearTxFinishFuture.java            |   33 +-
 .../cache/distributed/near/GridNearTxLocal.java |  101 +-
 .../near/GridNearTxPrepareRequest.java          |    2 +-
 .../mvcc/CacheCoordinatorsDiscoveryData.java    |    3 +
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  203 ++-
 .../cache/mvcc/CoordinatorAckRequestQuery.java  |  130 ++
 .../cache/mvcc/CoordinatorAckRequestTx.java     |  203 +++
 .../mvcc/CoordinatorAckRequestTxAndQuery.java   |  123 ++
 .../mvcc/CoordinatorAckRequestTxAndQueryEx.java |  147 ++
 .../mvcc/CoordinatorActiveQueriesMessage.java   |  136 ++
 .../cache/mvcc/CoordinatorFutureResponse.java   |    3 +
 .../cache/mvcc/CoordinatorQueryAckRequest.java  |  130 --
 .../cache/mvcc/CoordinatorTxAckRequest.java     |  194 --
 .../cache/mvcc/CoordinatorWaitTxsRequest.java   |    3 +
 .../cache/mvcc/MvccCoordinatorChangeAware.java  |   31 +
 .../cache/mvcc/MvccCoordinatorFuture.java       |   30 +
 .../cache/mvcc/MvccCoordinatorVersion.java      |    5 +
 .../mvcc/MvccCoordinatorVersionResponse.java    |   12 +-
 .../mvcc/MvccCoordinatorVersionWithoutTxs.java  |  173 ++
 .../processors/cache/mvcc/MvccCounter.java      |    5 +-
 .../cache/mvcc/MvccEmptyLongList.java           |   53 +
 .../processors/cache/mvcc/MvccQueryAware.java   |   43 -
 .../processors/cache/mvcc/MvccQueryTracker.java |   64 +-
 .../cache/mvcc/PreviousCoordinatorQueries.java  |   30 +-
 .../processors/cache/mvcc/TxMvccInfo.java       |   27 +-
 .../persistence/GridCacheOffheapManager.java    |    6 +-
 .../cache/transactions/IgniteTxAdapter.java     |   10 +
 .../transactions/IgniteTxLocalAdapter.java      |   10 +-
 .../cache/tree/AbstractDataInnerIO.java         |   11 +-
 .../cache/tree/AbstractDataLeafIO.java          |    2 +-
 .../processors/cache/tree/CacheDataTree.java    |    2 +-
 .../processors/cache/tree/MvccRemoveRow.java    |    3 +-
 .../processors/cache/tree/MvccUpdateRow.java    |   35 +-
 .../datastreamer/DataStreamerImpl.java          |    4 +-
 .../internal/TestRecordingCommunicationSpi.java |   10 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 1701 ++++++++++++++----
 .../testsuites/IgniteCacheMvccTestSuite.java    |   42 +
 61 files changed, 3536 insertions(+), 940 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 99bc8af..35e9af3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -103,13 +103,17 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQueryEx;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorActiveQueriesMessage;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestQuery;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTx;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQuery;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
+import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import 
org.apache.ignite.internal.processors.cache.mvcc.NewCoordinatorQueryAckRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
@@ -892,7 +896,7 @@ public class GridIoMessageFactory implements MessageFactory 
{
                 break;
 
             case 131: // TODO IGNITE-3478 fix constants.
-                msg = new CoordinatorTxAckRequest();
+                msg = new CoordinatorAckRequestTx();
 
                 break;
 
@@ -907,7 +911,7 @@ public class GridIoMessageFactory implements MessageFactory 
{
                 break;
 
             case 134:
-                msg = new CoordinatorQueryAckRequest();
+                msg = new CoordinatorAckRequestQuery();
 
                 break;
 
@@ -937,10 +941,30 @@ public class GridIoMessageFactory implements 
MessageFactory {
                 return msg;
 
             case 141:
+                msg = new CoordinatorAckRequestTxAndQuery();
+
+                return msg;
+
+            case 142:
+                msg = new CoordinatorAckRequestTxAndQueryEx();
+
+                return msg;
+
+            case 143:
                 msg = new MvccCounter();
 
                 return msg;
 
+            case 144:
+                msg = new CoordinatorActiveQueriesMessage();
+
+                return msg;
+
+            case 145:
+                msg = new MvccCoordinatorVersionWithoutTxs();
+
+                return msg;
+
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index e09d33c..86a2253 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccEmptyLongList;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -82,8 +83,13 @@ public class GridCacheEntryInfo implements Message, 
MvccCoordinatorVersion {
     }
 
     /** {@inheritDoc} */
-    @Override public MvccLongList activeTransactions() {
-        return null;
+    @Override public final MvccLongList activeTransactions() {
+        return MvccEmptyLongList.INSTANCE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final MvccCoordinatorVersion withoutActiveTransactions() {
+        return this;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index a1535e9..391a56e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -559,7 +559,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         boolean retVer,
         boolean keepBinary,
         boolean reserveForLoad,
-        MvccCoordinatorVersion mvccVer,
+        @Nullable MvccCoordinatorVersion mvccVer,
         @Nullable ReaderArguments readerArgs
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert !(retVer && readThrough);
@@ -748,7 +748,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     long expTime = CU.toExpireTime(ttl);
 
                     // Update indexes before actual write to entry.
-                    storeValue(ret, expTime, nextVer, null);
+                    if (cctx.mvccEnabled())
+                        cctx.offheap().mvccInitialValue(this, ret, nextVer, 
expTime, null); // TODO IGNITE-3478.
+                    else
+                        storeValue(ret, expTime, nextVer, null);
 
                     update(ret, expTime, ttl, nextVer, true);
 
@@ -1016,6 +1019,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     this,
                     val,
                     newVer,
+                    expireTime,
                     mvccVer);
             }
             else
@@ -2091,7 +2095,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     ", val=" + val + ']');
             }
 
-            removeValue();
+            if (cctx.mvccEnabled())
+                cctx.offheap().mvccRemoveAll(this);
+            else
+                removeValue();
         }
 
         onMarkedObsolete();
@@ -2285,7 +2292,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         ver = newVer;
         flags &= ~IS_EVICT_DISABLED;
 
-        removeValue();
+        if (cctx.mvccEnabled())
+            cctx.offheap().mvccRemoveAll(this);
+        else
+            removeValue();
 
         onInvalidate();
 
@@ -2520,7 +2530,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             long delta = expireTime - U.currentTimeMillis();
 
             if (delta <= 0) {
-                removeValue();
+                if (cctx.mvccEnabled())
+                    cctx.offheap().mvccRemoveAll(this);
+                else
+                    removeValue();
 
                 return true;
             }
@@ -2605,7 +2618,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, 
cctx);
 
                 if (cctx.mvccEnabled()) {
-                    cctx.offheap().mvccInitialValue(this, val, ver, mvccVer);
+                    cctx.offheap().mvccInitialValue(this, val, ver, expTime, 
mvccVer);
 
                     if (val != null)
                         update(val, expTime, ttl, ver, true);
@@ -2769,7 +2782,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, 
cctx);
 
                 if (val != null) {
-                    storeValue(val, expTime, newVer, null);
+                    if (cctx.mvccEnabled()) // TODO IGNITE-3478
+                        cctx.offheap().mvccInitialValue(this, val, newVer, 
expTime, null);
+                    else
+                        storeValue(val, expTime, newVer, null);
 
                     if (deletedUnlocked())
                         deletedUnlocked(false);
@@ -3086,7 +3102,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         if (log.isTraceEnabled())
             log.trace("onExpired clear [key=" + key + ", entry=" + 
System.identityHashCode(this) + ']');
 
-        removeValue();
+        if (cctx.mvccEnabled())
+            cctx.offheap().mvccRemoveAll(this);
+        else
+            removeValue();
 
         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
             cctx.events().addEvent(partition(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
index c914f58..0e72d98 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
@@ -27,6 +27,9 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
  */
 public class GridCacheMvccEntryInfo extends GridCacheEntryInfo {
     /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
     private long mvccCrdVer;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index f4e4d48..41e5175 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -897,6 +897,9 @@ public class GridCacheSharedContext<K, V> {
 
             GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
 
+            if (cacheCtx.mvccEnabled() != activeCacheCtx.mvccEnabled())
+                return "caches with different mvcc settings can't be enlisted 
in one transaction";
+
             if (cacheCtx.systemTx()) {
                 if (activeCacheCtx.cacheId() != cacheCtx.cacheId())
                     return "system transaction can include only one cache";

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 2c070fc..975d349 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -200,6 +200,7 @@ public interface IgniteCacheOffheapManager {
         GridCacheMapEntry entry,
         @Nullable CacheObject val,
         GridCacheVersion ver,
+        long expireTime,
         MvccCoordinatorVersion mvccVer
     ) throws IgniteCheckedException;
 
@@ -208,6 +209,7 @@ public interface IgniteCacheOffheapManager {
      * @param entry Entry.
      * @param val Value.
      * @param ver Cache version.
+     * @param expireTime Expire time.
      * @param mvccVer Mvcc update version.
      * @return Transactions to wait for before finishing current transaction.
      * @throws IgniteCheckedException If failed.
@@ -217,6 +219,7 @@ public interface IgniteCacheOffheapManager {
         GridCacheMapEntry entry,
         CacheObject val,
         GridCacheVersion ver,
+        long expireTime,
         MvccCoordinatorVersion mvccVer
     ) throws IgniteCheckedException;
 
@@ -545,6 +548,7 @@ public interface IgniteCacheOffheapManager {
             KeyCacheObject key,
             @Nullable CacheObject val,
             GridCacheVersion ver,
+            long expireTime,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
 
         /**
@@ -553,6 +557,7 @@ public interface IgniteCacheOffheapManager {
          * @param key Key.
          * @param val Value.
          * @param ver Version.
+         * @param expireTime Expire time.
          * @param mvccVer Mvcc version.
          * @return List of transactions to wait for.
          * @throws IgniteCheckedException If failed.
@@ -563,6 +568,7 @@ public interface IgniteCacheOffheapManager {
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
+            long expireTime,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 2bff203..c07e2a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -41,6 +41,8 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
+import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -387,12 +389,14 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridCacheMapEntry entry,
         CacheObject val,
         GridCacheVersion ver,
+        long expireTime,
         MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
         return dataStore(entry.localPartition()).mvccInitialValue(
             entry.context(),
             entry.key(),
             val,
             ver,
+            expireTime,
             mvccVer);
     }
 
@@ -402,12 +406,17 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridCacheMapEntry entry,
         CacheObject val,
         GridCacheVersion ver,
+        long expireTime,
         MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+        if (entry.detached() || entry.isNear())
+            return null;
+
         return dataStore(entry.localPartition()).mvccUpdate(entry.context(),
             primary,
             entry.key(),
             val,
             ver,
+            expireTime,
             mvccVer);
     }
 
@@ -417,6 +426,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridCacheMapEntry entry,
         MvccCoordinatorVersion mvccVer
     ) throws IgniteCheckedException {
+        if (entry.detached() || entry.isNear())
+            return null;
+
         return dataStore(entry.localPartition()).mvccRemove(entry.context(),
             primary,
             entry.key(),
@@ -425,6 +437,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
     /** {@inheritDoc} */
     @Override public void mvccRemoveAll(GridCacheMapEntry entry) throws 
IgniteCheckedException {
+        if (entry.detached() || entry.isNear())
+            return;
+
         dataStore(entry.localPartition()).mvccRemoveAll(entry.context(), 
entry.key());
     }
 
@@ -749,7 +764,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                             curPart = ds.partId();
 
                             // TODO IGNITE-3478, mvcc with cache groups.
-                            if (mvccVer != null)
+                            if (grp.mvccEnabled())
                                 cur = ds.mvccCursor(mvccVer);
                             else
                                 cur = cacheId == CU.UNDEFINED_CACHE_ID ? 
ds.cursor() : ds.cursor(cacheId);
@@ -1383,17 +1398,14 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             KeyCacheObject key,
             @Nullable CacheObject val,
             GridCacheVersion ver,
+            long expireTime,
             MvccCoordinatorVersion mvccVer)
             throws IgniteCheckedException
         {
-            assert mvccVer != null;
-
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
 
             try {
-                assert val != null || 
CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
-
                 int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
 
                 CacheObjectContext coCtx = cctx.cacheObjectContext();
@@ -1403,6 +1415,17 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 MvccUpdateRow updateRow;
 
+                boolean newVal = false;
+
+                // TODO IGNITE-3478: null is passed for loaded from store, 
need handle better.
+                if (mvccVer == null) {
+                    mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, 
CacheCoordinatorsProcessor.START_VER, 0L);
+
+                    newVal = true;
+                }
+                else
+                    assert val != null || 
CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+
                 if (val != null) {
                     val.valueBytes(coCtx);
 
@@ -1410,7 +1433,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                         key,
                         val,
                         ver,
+                        expireTime,
                         mvccVer,
+                        false,
                         partId,
                         cacheId);
                 }
@@ -1418,6 +1443,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                     updateRow = new MvccRemoveRow(
                         key,
                         mvccVer,
+                        false,
                         partId,
                         cacheId);
                 }
@@ -1427,6 +1453,25 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 rowStore.addRow(updateRow);
 
+                if (newVal) {
+                    GridCursor<CacheDataRow> cur = dataTree.find(
+                        new MvccSearchRow(cacheId, key, Long.MAX_VALUE, 
Long.MAX_VALUE),
+                        new MvccSearchRow(cacheId, key, 1L, 1L),
+                        CacheDataRowAdapter.RowData.KEY_ONLY);
+
+                    while (cur.next()) {
+                        CacheDataRow row = cur.get();
+
+                        assert row.link() != 0;
+
+                        boolean rmvd = dataTree.removex(row);
+
+                        assert rmvd;
+
+                        rowStore.removeRow(row.link());
+                    }
+                }
+
                 boolean old = dataTree.putx(updateRow);
 
                 assert !old;
@@ -1448,8 +1493,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
+            long expireTime,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             assert mvccVer != null;
+            assert primary || mvccVer.activeTransactions().size() == 0 : 
mvccVer;
 
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
@@ -1463,11 +1510,15 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 key.valueBytes(coCtx);
                 val.valueBytes(coCtx);
 
+                boolean needOld = hasPendingEntries || cctx.isQueryEnabled();
+
                 MvccUpdateRow updateRow = new MvccUpdateRow(
                     key,
                     val,
                     ver,
+                    expireTime,
                     mvccVer,
+                    needOld,
                     partId,
                     cacheId);
 
@@ -1507,6 +1558,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             KeyCacheObject key,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             assert mvccVer != null;
+            assert primary || mvccVer.activeTransactions().size() == 0 : 
mvccVer;
 
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
@@ -1519,9 +1571,12 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 // Make sure value bytes initialized.
                 key.valueBytes(coCtx);
 
+                boolean needOld = hasPendingEntries || cctx.isQueryEnabled();
+
                 MvccRemoveRow updateRow = new MvccRemoveRow(
                     key,
                     mvccVer,
+                    needOld,
                     partId,
                     cacheId);
 
@@ -1573,16 +1628,25 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 new MvccSearchRow(cacheId, key, 1, 1),
                 CacheDataRowAdapter.RowData.KEY_ONLY);
 
+            boolean first = true;
+
             while (cur.next()) {
                 CacheDataRow row = cur.get();
 
-                assert row.link() != 0;
+                assert row.link() != 0 : row;
 
                 boolean rmvd = dataTree.removex(row);
 
                 assert rmvd;
 
                 rowStore.removeRow(row.link());
+
+                if (first) {
+                    if (!versionForRemovedValue(row.mvccCoordinatorVersion()))
+                        decrementSize(cctx.cacheId());
+
+                    first = false;
+                }
             }
         }
 
@@ -1937,18 +2001,20 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                         long rowCrdVerMasked = row.mvccCoordinatorVersion();
 
-                        long rowCrdVer = 
unmaskCoordinatorVersion(rowCrdVerMasked);
+                        if (ver != null) {
+                            long rowCrdVer = 
unmaskCoordinatorVersion(rowCrdVerMasked);
 
-                        if (rowCrdVer > ver.coordinatorVersion())
-                            continue;
+                            if (rowCrdVer > ver.coordinatorVersion())
+                                continue;
 
-                        if (rowCrdVer == ver.coordinatorVersion() && 
row.mvccCounter() > ver.counter())
-                            continue;
+                            if (rowCrdVer == ver.coordinatorVersion() && 
row.mvccCounter() > ver.counter())
+                                continue;
 
-                        MvccLongList txs = ver.activeTransactions();
+                            MvccLongList txs = ver.activeTransactions();
 
-                        if (txs != null && rowCrdVer == 
ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
-                            continue;
+                            if (txs != null && rowCrdVer == 
ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
+                                continue;
+                        }
 
                         if (curKey != null && row.key().equals(curKey))
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 77039cc..839f3d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -474,7 +474,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                     cctx.database().checkpointReadLock();
 
                     try {
-                        assert !txState.mvccEnabled(cctx) || mvccInfo != null;
+                        assert !txState.mvccEnabled(cctx) || mvccInfo != null 
: "Mvcc is not initialized: " + this;
 
                         Collection<IgniteTxEntry> entries = near() ? 
allEntries() : writeEntries();
 
@@ -597,7 +597,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                         resolveTaskName(),
                                                         dhtVer,
                                                         
txEntry.updateCounter(),
-                                                        mvccInfo != null ? 
mvccInfo.version() : null);
+                                                        
mvccVersionForUpdate());
                                                 else {
                                                     assert val != null : 
txEntry;
 
@@ -622,7 +622,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                         resolveTaskName(),
                                                         dhtVer,
                                                         
txEntry.updateCounter(),
-                                                        mvccInfo != null ? 
mvccInfo.version() : null);
+                                                        
mvccVersionForUpdate());
 
                                                     // Keep near entry up to 
date.
                                                     if (nearCached != null) {
@@ -655,7 +655,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                     resolveTaskName(),
                                                     dhtVer,
                                                     txEntry.updateCounter(),
-                                                    mvccInfo != null ? 
mvccInfo.version() : null);
+                                                    mvccVersionForUpdate());
 
                                                 // Keep near entry up to date.
                                                 if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5dbb3a8..e1c5379 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -848,9 +848,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @param taskNameHash Task name hash.
      * @param expiry Expiry.
      * @param skipVals Skip vals flag.
+     * @param mvccVer Mvcc version.
      * @return Future for the operation.
      */
-    public GridDhtGetSingleFuture getDhtSingleAsync(
+    GridDhtGetSingleFuture getDhtSingleAsync(
         UUID nodeId,
         long msgId,
         KeyCacheObject key,
@@ -861,7 +862,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiry,
         boolean skipVals,
-        boolean recovery
+        boolean recovery,
+        MvccCoordinatorVersion mvccVer
     ) {
         GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>(
             ctx,
@@ -875,7 +877,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
             taskNameHash,
             expiry,
             skipVals,
-            recovery);
+            recovery,
+            mvccVer);
 
         fut.init();
 
@@ -903,7 +906,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                 req.taskNameHash(),
                 expiryPlc,
                 req.skipValues(),
-                req.recovery());
+                req.recovery(),
+                req.mvccVersion());
 
         fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
             @Override public void 
apply(IgniteInternalFuture<GridCacheEntryInfo> f) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 9fb4b0a..7462406 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -103,6 +104,9 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
     /** Recovery context flag. */
     private final boolean recovery;
 
+    /** */
+    private final MvccCoordinatorVersion mvccVer;
+
     /**
      * @param cctx Context.
      * @param msgId Message ID.
@@ -115,6 +119,7 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
      * @param taskNameHash Task name hash code.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
+     * @param mvccVer Mvcc version.
      */
     public GridDhtGetSingleFuture(
         GridCacheContext<K, V> cctx,
@@ -128,7 +133,8 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         assert reader != null;
         assert key != null;
@@ -145,6 +151,7 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
         this.recovery = recovery;
+        this.mvccVer = mvccVer;
 
         futId = IgniteUuid.randomUuid();
 
@@ -366,7 +373,7 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
                 expiryPlc,
                 skipVals,
                 recovery,
-                null);  // TODO IGNITE-3478
+                mvccVer);
         }
         else {
             final ReaderArguments args = readerArgs;
@@ -392,7 +399,7 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
                                 expiryPlc,
                                 skipVals,
                                 recovery,
-                                null);  // TODO IGNITE-3478
+                                mvccVer);
 
                         fut0.listen(createGetFutureListener());
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d624e2c..5e8428d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -300,7 +301,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
 
             assert mvccInfo != null;
 
-            IgniteInternalFuture fut = 
cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
+            IgniteInternalFuture fut = 
cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(), waitTxs);
 
             add(fut);
 
@@ -412,7 +413,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
         if (tx.onePhaseCommit())
             return false;
 
-        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != 
null;
+        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != 
null || F.isEmpty(tx.writeEntries());
 
         boolean sync = tx.syncMode() == FULL_SYNC;
 
@@ -423,6 +424,12 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
 
         int miniId = 0;
 
+        // Do not need process active transactions on backups.
+        TxMvccInfo mvccInfo = tx.mvccInfo();
+
+        if (mvccInfo != null)
+            mvccInfo = mvccInfo.withoutActiveTransactions();
+
         // Create mini futures.
         for (GridDistributedTxMapping dhtMapping : dhtMap.values()) {
             ClusterNode n = dhtMapping.primary();
@@ -470,7 +477,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                 updCntrs,
                 false,
                 false,
-                tx.mvccInfo());
+                mvccInfo);
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : 
tx.xidVersion());
 
@@ -540,7 +547,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                     tx.activeCachesDeploymentEnabled(),
                     false,
                     false,
-                    tx.mvccInfo());
+                    mvccInfo);
 
                 req.writeVersion(tx.writeVersion());
 
@@ -610,13 +617,23 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        // TODO IGNITE-3478 (mvcc wait txs fut)
         Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
             @SuppressWarnings("unchecked")
             @Override public String apply(IgniteInternalFuture<?> f) {
-                return "[node=" + ((MiniFuture)f).node().id() +
-                    ", loc=" + ((MiniFuture)f).node().isLocal() +
-                    ", done=" + f.isDone() + "]";
+                if (f.getClass() == MiniFuture.class) {
+                    return "[node=" + ((MiniFuture)f).node().id() +
+                        ", loc=" + ((MiniFuture)f).node().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+                else if (f instanceof MvccCoordinatorFuture) {
+                    MvccCoordinatorFuture crdFut = (MvccCoordinatorFuture)f;
+
+                    return "[mvccCrdNode=" + crdFut.coordinatorNodeId() +
+                        ", loc=" + 
crdFut.coordinatorNodeId().equals(cctx.localNodeId()) +
+                        ", done=" + f.isDone() + "]";
+                }
+                else
+                    return f.toString();
             }
         });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 3143c4f..623dea8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1344,6 +1344,12 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
 
         final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
 
+        // Do not need process active transactions on backups.
+        TxMvccInfo mvccInfo = tx.mvccInfo();
+
+        if (mvccInfo != null)
+            mvccInfo = mvccInfo.withoutActiveTransactions();
+
         // Create mini futures.
         for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
             assert !dhtMapping.empty();
@@ -1387,7 +1393,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 tx.activeCachesDeploymentEnabled(),
                 tx.storeWriteThrough(),
                 retVal,
-                tx.mvccInfo());
+                mvccInfo);
 
             int idx = 0;
 
@@ -1501,7 +1507,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                     tx.activeCachesDeploymentEnabled(),
                     tx.storeWriteThrough(),
                     retVal,
-                    tx.mvccInfo());
+                    mvccInfo);
 
                 for (IgniteTxEntry entry : nearMapping.entries()) {
                     if (CU.writes().apply(entry)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7993d05..e424a18 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -45,7 +45,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware;
+import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -61,13 +61,15 @@ import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Colocated get future.
  */
-public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAdapter<K, V> implements MvccQueryAware {
+public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAdapter<K, V>
+    implements MvccCoordinatorChangeAware, 
IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -78,6 +80,9 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
     private static IgniteLogger log;
 
     /** */
+    protected final MvccCoordinatorVersion mvccVer;
+
+    /** */
     private MvccQueryTracker mvccTracker;
 
     /**
@@ -94,6 +99,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
      * @param skipVals Skip values flag.
      * @param needVer If {@code true} returns values as tuples containing 
value and version.
      * @param keepCacheObjects Keep cache objects flag.
+     * @param mvccVer Mvcc version.
      */
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
@@ -107,7 +113,8 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
         boolean needVer,
-        boolean keepCacheObjects
+        boolean keepCacheObjects,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         super(cctx,
             keys,
@@ -121,6 +128,9 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
             needVer,
             keepCacheObjects,
             recovery);
+        assert mvccVer == null || cctx.mvccEnabled();
+
+        this.mvccVer = mvccVer;
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, 
GridPartitionedGetFuture.class);
@@ -133,6 +143,9 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
         if (!cctx.mvccEnabled())
             return null;
 
+        if (mvccVer != null)
+            return mvccVer;
+
         MvccCoordinatorVersion ver = mvccTracker.mvccVersion();
 
         assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + 
"]";
@@ -158,7 +171,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
                 canRemap ? cctx.affinity().affinityTopologyVersion() : 
cctx.shared().exchange().readyAffinityVersion();
         }
 
-        if (cctx.mvccEnabled()) {
+        if (cctx.mvccEnabled() && mvccVer == null) {
             mvccTracker = new MvccQueryTracker(cctx, canRemap, this);
 
             trackable = true;
@@ -174,13 +187,14 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
     }
 
     /** {@inheritDoc} */
-    @Override public void onMvccVersionReceived(AffinityTopologyVersion 
topVer) {
-        initialMap(topVer);
-    }
+    @Override public void apply(AffinityTopologyVersion topVer, 
IgniteCheckedException e) {
+        if (e != null)
+            onDone(e);
+        else {
+            assert topVer != null;
 
-    /** {@inheritDoc} */
-    @Override public void onMvccVersionError(IgniteCheckedException e) {
-        onDone(e);
+            initialMap(topVer);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index b34687f..c31b8b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,11 +41,12 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -122,6 +123,9 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
     @GridToStringInclude
     private ClusterNode node;
 
+    /** */
+    protected final MvccCoordinatorVersion mvccVer;
+
     /**
      * @param cctx Context.
      * @param key Key.
@@ -149,9 +153,11 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
         boolean skipVals,
         boolean needVer,
         boolean keepCacheObjects,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         assert key != null;
+        assert mvccVer == null || cctx.mvccEnabled();
 
         AffinityTopologyVersion lockedTopVer = 
cctx.shared().lockedTopologyVersion(null);
 
@@ -176,6 +182,7 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
         this.keepCacheObjects = keepCacheObjects;
         this.recovery = recovery;
         this.topVer = topVer;
+        this.mvccVer = mvccVer;
 
         futId = IgniteUuid.randomUuid();
 
@@ -230,7 +237,8 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
                 taskName == null ? 0 : taskName.hashCode(),
                 expiryPlc,
                 skipVals,
-                recovery);
+                recovery,
+                mvccVer);
 
             final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -288,7 +296,8 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
                 /*add reader*/false,
                 needVer,
                 cctx.deploymentEnabled(),
-                recovery);
+                recovery,
+                mvccVer);
 
             try {
                 cctx.io().send(node, req, cctx.ioPolicy());
@@ -355,7 +364,8 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
                 boolean skipEntry = readNoEntry;
 
                 if (readNoEntry) {
-                    CacheDataRow row = cctx.offheap().read(cctx, key); // TODO 
IGNITE-3478
+                    CacheDataRow row = mvccVer != null ? 
cctx.offheap().mvccRead(cctx, key, mvccVer) :
+                        cctx.offheap().read(cctx, key);
 
                     if (row != null) {
                         long expireTime = row.expireTime();
@@ -398,8 +408,8 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
                                 taskName,
                                 expiryPlc,
                                 true,
-                                null,
-                                null); // TODO IGNITE-3478
+                                mvccVer,
+                                null);
 
                             if (res != null) {
                                 v = res.value();
@@ -418,7 +428,7 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
                                 taskName,
                                 expiryPlc,
                                 true,
-                                null); // TODO IGNITE-3478
+                                mvccVer);
                         }
 
                         colocated.context().evicts().touch(entry, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 16416cc..d6862fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1385,7 +1385,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             skipVals,
             needVer,
             false,
-            recovery);
+            recovery,
+            null);
 
         fut.init();
 
@@ -1591,7 +1592,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             expiry,
             skipVals,
             needVer,
-            false);
+            false,
+            null);
 
         fut.init(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7364cb3..c975edb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
@@ -241,7 +242,8 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             skipVals,
             needVer,
             /*keepCacheObjects*/false,
-            opCtx != null && opCtx.recovery());
+            opCtx != null && opCtx.recovery(),
+            null);
 
         fut.init();
 
@@ -319,7 +321,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
      * @param needVer Need version.
      * @return Loaded values.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(
+    private IgniteInternalFuture<Map<K, V>> loadAsync(
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean forcePrimary,
@@ -341,7 +343,8 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             expiryPlc,
             skipVals,
             needVer,
-            false);
+            false,
+            null);
     }
 
     /**
@@ -370,7 +373,8 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
         boolean skipVals,
         boolean needVer,
         boolean keepCacheObj,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         GridPartitionedSingleGetFuture fut = new 
GridPartitionedSingleGetFuture(ctx,
             ctx.toCacheKeyObject(key),
@@ -384,7 +388,8 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             skipVals,
             needVer,
             keepCacheObj,
-            recovery);
+            recovery,
+            mvccVer);
 
         fut.init();
 
@@ -403,6 +408,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
      * @param skipVals Skip values flag.
      * @param needVer If {@code true} returns values as tuples containing 
value and version.
      * @param keepCacheObj Keep cache objects flag.
+     * @param mvccVer Mvcc version.
      * @return Load future.
      */
     public final IgniteInternalFuture<Map<K, V>> loadAsync(
@@ -417,8 +423,11 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
         boolean needVer,
-        boolean keepCacheObj
+        boolean keepCacheObj,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
+        assert mvccVer == null || ctx.mvccEnabled();
+
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
 
@@ -426,7 +435,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             expiryPlc = expiryPolicy(null);
 
         // Optimisation: try to resolve value locally and escape 'get future' 
creation.
-        if (!forcePrimary && ctx.affinityNode() && !ctx.mvccEnabled()) {
+        if (!forcePrimary && ctx.affinityNode() && (!ctx.mvccEnabled() || 
mvccVer != null)) {
             try {
                 Map<K, V> locVals = null;
 
@@ -499,7 +508,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
                                             taskName,
                                             expiryPlc,
                                             !deserializeBinary,
-                                            null,
+                                            mvccVer,
                                             null);
 
                                         if (getRes != null) {
@@ -519,7 +528,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
                                             taskName,
                                             expiryPlc,
                                             !deserializeBinary,
-                                            null);
+                                            mvccVer);
                                     }
 
                                     // Entry was not in memory or in swap, so 
we remove it from cache.
@@ -602,7 +611,8 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             expiryPlc,
             skipVals,
             needVer,
-            keepCacheObj);
+            keepCacheObj,
+            mvccVer);
 
         fut.init(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 88095ab..c14d6e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -79,8 +79,9 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware;
+import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -657,7 +658,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
-            updateTopologies(crdNode, 
cctx.coordinators().currentCoordinator());
+            updateTopologies(crd, crdNode, 
cctx.coordinators().currentCoordinator());
 
             switch (exchange) {
                 case ALL: {
@@ -760,11 +761,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param exchCrd Exchange coordinator node.
      * @param crd Coordinator flag.
      * @param mvccCrd Mvcc coordinator.
      * @throws IgniteCheckedException If failed.
      */
-    private void updateTopologies(boolean crd, MvccCoordinator mvccCrd) throws 
IgniteCheckedException {
+    private void updateTopologies(ClusterNode exchCrd, boolean crd, 
MvccCoordinator mvccCrd) throws IgniteCheckedException {
         for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
             if (grp.isLocal())
                 continue;
@@ -808,24 +810,41 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             Map<MvccCounter, Integer> activeQrys = new HashMap<>();
 
-            for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) {
-                if (fut instanceof MvccQueryAware) {
-                    MvccCoordinatorVersion ver = 
((MvccQueryAware)fut).onMvccCoordinatorChange(mvccCrd);
+            for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures())
+                processMvccCoordinatorChange(mvccCrd, fut, activeQrys);
 
-                    if (ver != null ) {
-                        MvccCounter cntr = new 
MvccCounter(ver.coordinatorVersion(), ver.counter());
+            for (IgniteInternalTx tx : cctx.tm().activeTransactions())
+                processMvccCoordinatorChange(mvccCrd, tx, activeQrys);
 
-                        Integer cnt = activeQrys.get(cntr);
+            exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys);
 
-                        if (cnt == null)
-                            activeQrys.put(cntr, 1);
-                        else
-                            activeQrys.put(cntr, cnt + 1);
-                    }
-                }
-            }
+            if (exchCrd == null || !mvccCrd.nodeId().equals(exchCrd.id()))
+                cctx.coordinators().sendActiveQueries(mvccCrd.nodeId(), 
activeQrys);
+        }
+    }
 
-            exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys);
+    /**
+     * @param mvccCrd New coordinator.
+     * @param nodeObj Node object.
+     * @param activeQrys Active queries map to update.
+     */
+    private void processMvccCoordinatorChange(MvccCoordinator mvccCrd,
+        Object nodeObj,
+        Map<MvccCounter, Integer> activeQrys)
+    {
+        if (nodeObj instanceof MvccCoordinatorChangeAware) {
+            MvccCoordinatorVersion ver = 
((MvccCoordinatorChangeAware)nodeObj).onMvccCoordinatorChange(mvccCrd);
+
+            if (ver != null ) {
+                MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), 
ver.counter());
+
+                Integer cnt = activeQrys.get(cntr);
+
+                if (cnt == null)
+                    activeQrys.put(cntr, 1);
+                else
+                    activeQrys.put(cntr, cnt + 1);
+            }
         }
     }
 
@@ -1288,9 +1307,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 msg.partitionHistoryCounters(partHistReserved0);
         }
 
-        Map<UUID, Map<MvccCounter, Integer>> activeQueries = 
exchCtx.activeQueries();
+        if (exchCtx.newMvccCoordinator() && 
cctx.coordinators().currentCoordinatorId().equals(node.id())) {
+            Map<UUID, Map<MvccCounter, Integer>> activeQueries = 
exchCtx.activeQueries();
 
-        msg.activeQueries(activeQueries != null ? 
activeQueries.get(cctx.localNodeId()) : null);
+            msg.activeQueries(activeQueries != null ? 
activeQueries.get(cctx.localNodeId()) : null);
+        }
 
         if (stateChangeExchange() && changeGlobalStateE != null)
             msg.setError(changeGlobalStateE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index c6f3280..ab927d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Get request. Responsible for obtaining entry from primary node. 'Near' 
means 'Primary' here, not 'Near Cache'.
@@ -132,6 +133,7 @@ public class GridNearGetRequest extends GridCacheIdMessage 
implements GridCacheD
      * @param createTtl New TTL to set after entry is created, -1 to leave 
unchanged.
      * @param accessTtl New TTL to set after entry is accessed, -1 to leave 
unchanged.
      * @param addDepInfo Deployment info.
+     * @param mvccVer Mvcc version.
      */
     public GridNearGetRequest(
         int cacheId,
@@ -149,7 +151,7 @@ public class GridNearGetRequest extends GridCacheIdMessage 
implements GridCacheD
         boolean skipVals,
         boolean addDepInfo,
         boolean recovery,
-        MvccCoordinatorVersion mvccVer
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         assert futId != null;
         assert miniId != null;
@@ -194,9 +196,9 @@ public class GridNearGetRequest extends GridCacheIdMessage 
implements GridCacheD
     }
 
     /**
-     * @return Counter.
+     * @return Mvcc version.
      */
-    public MvccCoordinatorVersion mvccVersion() {
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
         return mvccVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 9d36bca..b606f0e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -38,6 +38,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -52,7 +54,6 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -69,10 +70,6 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARING;
 public class GridNearOptimisticSerializableTxPrepareFuture extends 
GridNearOptimisticTxPrepareFutureAdapter {
     /** */
     @GridToStringExclude
-    private KeyLockFuture keyLockFut;
-
-    /** */
-    @GridToStringExclude
     private ClientRemapFuture remapFut;
 
     /** */
@@ -189,10 +186,20 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 tx.removeMapping(m.primary().id());
         }
 
+        prepareError(e);
+    }
+
+    /**
+     * @param e Error.
+     */
+    private void prepareError(Throwable e) {
         ERR_UPD.compareAndSet(this, null, e);
 
         if (keyLockFut != null)
             keyLockFut.onDone(e);
+
+        if (mvccVerFut != null)
+            mvccVerFut.onDone();
     }
 
     /** {@inheritDoc} */
@@ -345,11 +352,25 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         boolean hasNearCache = false;
 
+        MvccCoordinator mvccCrd = null;
+
         for (IgniteTxEntry write : writes) {
             map(write, topVer, mappings, txMapping, remap, topLocked);
 
-            if (write.context().isNear())
+            GridCacheContext cctx = write.context();
+
+            if (cctx.isNear())
                 hasNearCache = true;
+
+            if (cctx.mvccEnabled() && mvccCrd == null) {
+                mvccCrd = cctx.affinity().mvccCoordinator(topVer);
+
+                if (mvccCrd == null) {
+                    
onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer));
+
+                    return;
+                }
+            }
         }
 
         for (IgniteTxEntry read : reads)
@@ -365,6 +386,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
             return;
         }
 
+        assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null || 
F.isEmpty(writes);
+
         tx.addEntryMapping(mappings.values());
 
         cctx.mvcc().recheckPendingLocks();
@@ -376,12 +399,16 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         MiniFuture locNearEntriesFut = null;
 
+        int lockCnt = keyLockFut != null ? 1 : 0;
+
         // Create futures in advance to have all futures when process {@link 
GridNearTxPrepareResponse#clientRemapVersion}.
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
             MiniFuture fut = new MiniFuture(this, m, ++miniId);
 
+            lockCnt++;
+
             add((IgniteInternalFuture)fut);
 
             if (m.primary().isLocal() && m.hasNearCacheEntries() && 
m.hasColocatedCacheEntries()) {
@@ -390,9 +417,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
                 locNearEntriesFut = fut;
 
                 add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId));
+
+                lockCnt++;
             }
         }
 
+        if (mvccCrd != null)
+            initMvccVersionFuture(mvccCrd, lockCnt, remap);
+
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
 
         Iterator<IgniteInternalFuture<?>> it = futs.iterator();
@@ -703,20 +735,20 @@ public class 
GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         Collection<String> futs = F.viewReadOnly(futures(),
             new C1<IgniteInternalFuture<?>, String>() {
                 @Override public String apply(IgniteInternalFuture<?> f) {
-                    return "[node=" + ((MiniFuture)f).primary().id() +
-                        ", loc=" + ((MiniFuture)f).primary().isLocal() +
-                        ", done=" + f.isDone() + "]";
-                }
-            },
-            new P1<IgniteInternalFuture<?>>() {
-                @Override public boolean apply(IgniteInternalFuture<?> f) {
-                    return isMini(f);
+                    if (isMini(f)) {
+                        return "[node=" + ((MiniFuture)f).primary().id() +
+                            ", loc=" + ((MiniFuture)f).primary().isLocal() +
+                            ", done=" + f.isDone() +
+                            ", err=" + f.error() + "]";
+                    }
+                    else
+                        return f.toString();
                 }
             });
 
         return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, 
this,
             "innerFuts", futs,
-            "keyLockFut", keyLockFut,
+            "remap", remapFut != null,
             "tx", tx,
             "super", super.toString());
     }
@@ -924,7 +956,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
                                                         remap(res);
                                                     }
                                                     catch 
(IgniteCheckedException e) {
-                                                        
ERR_UPD.compareAndSet(parent, null, e);
+                                                        parent.prepareError(e);
 
                                                         onDone(e);
                                                     }
@@ -937,7 +969,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
 
                                             err0.retryReadyFuture(affFut);
 
-                                            ERR_UPD.compareAndSet(parent, 
null, err0);
+                                            parent.prepareError(err0);
 
                                             onDone(err0);
                                         }
@@ -948,7 +980,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
                                                 parent);
                                         }
 
-                                        ERR_UPD.compareAndSet(parent, null, e);
+                                        parent.prepareError(e);
 
                                         onDone(e);
                                     }
@@ -963,6 +995,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
 
                         // Finish this mini future (need result only on client 
node).
                         onDone(parent.cctx.kernalContext().clientNode() ? res 
: null);
+
+                        if (parent.mvccVerFut != null)
+                            parent.mvccVerFut.onLockReceived();
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index ef3075e..1fba1dd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -41,6 +41,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -49,7 +51,6 @@ import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -73,10 +74,6 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARING;
  */
 public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepareFutureAdapter {
     /** */
-    @GridToStringExclude
-    private KeyLockFuture keyLockFut;
-
-    /** */
     private int miniId;
 
     /** */
@@ -382,6 +379,18 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
             return;
         }
 
+        if (write.context().mvccEnabled()) {
+            MvccCoordinator mvccCrd = 
write.context().affinity().mvccCoordinator(topVer);
+
+            if (mvccCrd == null) {
+                onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer));
+
+                return;
+            }
+
+            initMvccVersionFuture(mvccCrd, keyLockFut != null ? 2 : 1, remap);
+        }
+
         if (keyLockFut != null)
             keyLockFut.onAllKeysAdded();
 
@@ -426,14 +435,27 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
 
         boolean hasNearCache = false;
 
+        MvccCoordinator mvccCrd = null;
+
         for (IgniteTxEntry write : writes) {
             write.clearEntryReadVersion();
 
             GridDistributedTxMapping updated = map(write, topVer, cur, 
topLocked, remap);
 
-            if(updated == null)
-                // an exception occurred while transaction mapping, stop 
further processing
+            if (updated == null) {
+                // An exception occurred while transaction mapping, stop 
further processing.
                 break;
+            }
+
+            if (write.context().mvccEnabled() && mvccCrd == null) {
+                mvccCrd = write.context().affinity().mvccCoordinator(topVer);
+
+                if (mvccCrd == null) {
+                    
onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer));
+
+                    break;
+                }
+            }
 
             if (write.context().isNear())
                 hasNearCache = true;
@@ -474,6 +496,11 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
             return;
         }
 
+        assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null;
+
+        if (mvccCrd != null)
+            initMvccVersionFuture(mvccCrd, keyLockFut != null ? 2 : 1, remap);
+
         if (keyLockFut != null)
             keyLockFut.onAllKeysAdded();
 
@@ -497,8 +524,12 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
     private void proceedPrepare(final Queue<GridDistributedTxMapping> 
mappings) {
         final GridDistributedTxMapping m = mappings.poll();
 
-        if (m == null)
+        if (m == null) {
+            if (mvccVerFut != null)
+                mvccVerFut.onLockReceived();
+
             return;
+        }
 
         proceedPrepare(m, mappings);
     }
@@ -786,9 +817,13 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
             @Override public String apply(IgniteInternalFuture<?> f) {
-                return "[node=" + ((MiniFuture)f).node().id() +
-                    ", loc=" + ((MiniFuture)f).node().isLocal() +
-                    ", done=" + f.isDone() + "]";
+                if (isMini(f)) {
+                    return "[node=" + ((MiniFuture)f).node().id() +
+                        ", loc=" + ((MiniFuture)f).node().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+                else
+                    return f.toString();
             }
         }, new P1<IgniteInternalFuture<Object>>() {
             @Override public boolean apply(IgniteInternalFuture<Object> fut) {
@@ -798,7 +833,6 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
 
         return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
             "innerFuts", futs,
-            "keyLockFut", keyLockFut,
             "tx", tx,
             "super", super.toString());
     }
@@ -954,6 +988,8 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
                         // Proceed prepare before finishing mini future.
                         if (mappings != null)
                             parent.proceedPrepare(mappings);
+                        else if (parent.mvccVerFut != null)
+                            parent.mvccVerFut.onLockReceived();
 
                         // Finish this mini future.
                         onDone((GridNearTxPrepareResponse)null);

Reply via email to