ignite-3479 Coordinators reassign on failure

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/761e43d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/761e43d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/761e43d3

Branch: refs/heads/ignite-3478
Commit: 761e43d3039cf8c58c9c7b0ec2dde68238d71647
Parents: 7f4defd
Author: sboikov <[email protected]>
Authored: Fri Sep 29 14:29:03 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 29 14:29:03 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |    5 +-
 .../ignite/internal/GridKernalContext.java      |    6 +
 .../ignite/internal/GridKernalContextImpl.java  |   14 +-
 .../apache/ignite/internal/IgniteKernal.java    |    4 +-
 .../managers/communication/GridIoManager.java   |    3 +
 .../communication/GridIoMessageFactory.java     |   18 +
 .../internal/managers/discovery/DiscoCache.java |   13 +
 .../discovery/GridDiscoveryManager.java         |    4 +
 .../processors/affinity/AffinityAssignment.java |    9 +-
 .../affinity/GridAffinityAssignment.java        |   15 +-
 .../affinity/GridAffinityAssignmentCache.java   |   21 +-
 .../affinity/GridAffinityProcessor.java         |    2 +-
 .../processors/affinity/GridAffinityUtils.java  |    2 +-
 .../affinity/HistoryAffinityAssignment.java     |   12 +-
 .../cache/CacheAffinitySharedManager.java       |   29 +-
 .../processors/cache/ExchangeContext.java       |   43 +-
 .../cache/GridCacheAffinityManager.java         |    5 +
 .../GridCachePartitionExchangeManager.java      |   74 +-
 .../processors/cache/GridCacheProcessor.java    |    3 -
 .../cache/GridCacheSharedContext.java           |   20 +-
 .../GridDistributedTxRemoteAdapter.java         |    8 +-
 .../dht/GridClientPartitionTopology.java        |    7 +
 .../distributed/dht/GridDhtCacheAdapter.java    |    2 +
 .../dht/GridDhtPartitionTopology.java           |    4 +
 .../dht/GridDhtPartitionTopologyImpl.java       |   11 +
 .../distributed/dht/GridDhtTxFinishFuture.java  |   15 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |   20 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |    2 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |   45 +-
 .../dht/GridDhtTxPrepareRequest.java            |   18 +-
 .../dht/GridPartitionedGetFuture.java           |  118 +-
 .../GridDhtPartitionsExchangeFuture.java        |   72 +-
 .../GridDhtPartitionsSingleMessage.java         |   64 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   70 +-
 .../near/GridNearTxFinishAndAckFuture.java      |   10 +-
 .../near/GridNearTxFinishFuture.java            |   17 +-
 .../near/GridNearTxFinishRequest.java           |   18 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |    2 +-
 .../near/GridNearTxPrepareResponse.java         |   20 +-
 .../mvcc/CacheCoordinatorsDiscoveryData.java    |   42 +
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 1304 ++++++++++++++++++
 .../mvcc/CacheCoordinatorsSharedManager.java    |  999 --------------
 .../mvcc/CoordinatorAssignmentHistory.java      |   71 -
 .../processors/cache/mvcc/MvccCoordinator.java  |  101 ++
 .../processors/cache/mvcc/MvccCounter.java      |  163 +++
 .../processors/cache/mvcc/MvccQueryAware.java   |   43 +
 .../processors/cache/mvcc/MvccQueryTracker.java |  232 ++++
 .../cache/mvcc/MvccResponseListener.java        |   10 +-
 .../mvcc/NewCoordinatorQueryAckRequest.java     |  156 +++
 .../cache/mvcc/PreviousCoordinatorQueries.java  |  190 +++
 .../processors/cache/mvcc/TxMvccInfo.java       |  141 ++
 .../wal/reader/IgniteWalIteratorFactory.java    |    2 +-
 .../wal/reader/StandaloneGridKernalContext.java |    6 +
 .../query/GridCacheDistributedQueryManager.java |    5 +-
 .../cache/query/GridCacheQueryManager.java      |   11 +-
 .../cache/transactions/IgniteInternalTx.java    |    6 +-
 .../cache/transactions/IgniteTxAdapter.java     |   17 +-
 .../cache/transactions/IgniteTxHandler.java     |    8 +-
 .../transactions/IgniteTxLocalAdapter.java      |   12 +-
 .../cache/tree/AbstractDataInnerIO.java         |    6 +-
 .../cache/tree/AbstractDataLeafIO.java          |    6 +-
 .../processors/cache/tree/CacheDataTree.java    |    4 +-
 .../cache/tree/CacheIdAwareDataInnerIO.java     |    4 +-
 .../cache/tree/CacheIdAwareDataLeafIO.java      |    4 +-
 .../processors/cache/tree/DataInnerIO.java      |    4 +-
 .../processors/cache/tree/DataLeafIO.java       |    4 +-
 .../processors/cache/tree/MvccDataRow.java      |    4 +-
 .../processors/cache/tree/SearchRow.java        |    4 +-
 .../util/future/GridCompoundFuture.java         |    4 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  764 +++++++++-
 .../pagemem/BPlusTreePageMemoryImplTest.java    |    1 -
 .../BPlusTreeReuseListPageMemoryImplTest.java   |    1 -
 .../MetadataStoragePageMemoryImplTest.java      |    1 -
 .../pagemem/PageMemoryImplNoLoadTest.java       |    1 -
 .../persistence/pagemem/PageMemoryImplTest.java |    1 -
 .../loadtests/hashmap/GridCacheTestContext.java |    2 -
 .../testframework/junits/GridAbstractTest.java  |    8 +
 77 files changed, 3765 insertions(+), 1402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 93ffe95..c3a8127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -61,7 +61,10 @@ public interface GridComponent {
         BINARY_PROC,
 
         /** Query processor. */
-        QUERY_PROC
+        QUERY_PROC,
+
+        /** */
+        CACHE_CRD_PROC
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 99c7cce..88251aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.managers.indexing.GridIndexingManager;
 import 
org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -643,4 +644,9 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
      * @return Platform processor.
      */
     public PlatformProcessor platform();
+
+    /**
+     * @return Cache mvcc coordinator processor.
+     */
+    public CacheCoordinatorsProcessor coordinators();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 07e5970..86c0adc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -282,6 +283,10 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     @GridToStringExclude
     private DataStructuresProcessor dataStructuresProc;
 
+    /** Cache mvcc coordinators. */
+    @GridToStringExclude
+    private CacheCoordinatorsProcessor coordProc;
+
     /** */
     @GridToStringExclude
     private List<GridComponent> comps = new LinkedList<>();
@@ -344,7 +349,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
-    Map<String, ? extends ExecutorService> customExecSvcs;
+    private Map<String, ? extends ExecutorService> customExecSvcs;
 
     /** */
     @GridToStringExclude
@@ -579,6 +584,8 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
             poolProc = (PoolProcessor) comp;
         else if (comp instanceof GridMarshallerMappingProcessor)
             mappingProc = (GridMarshallerMappingProcessor)comp;
+        else if (comp instanceof CacheCoordinatorsProcessor)
+            coordProc = (CacheCoordinatorsProcessor)comp;
         else if (!(comp instanceof DiscoveryNodeValidationProcessor
                 || comp instanceof PlatformPluginProcessor))
             assert (comp instanceof GridPluginComponent) : "Unknown manager 
class: " + comp.getClass();
@@ -834,6 +841,11 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public CacheCoordinatorsProcessor coordinators() {
+        return coordProc;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteLogger log(String ctgr) {
         return config().getGridLogger().getLogger(ctgr);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index b015666..2dbbb7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -114,6 +114,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
@@ -937,8 +938,9 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             // Start processors before discovery manager, so they will
             // be able to start receiving messages once discovery completes.
             try {
+                startProcessor(new CacheCoordinatorsProcessor(ctx));
                 
startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
-                startProcessor(new  GridAffinityProcessor(ctx));
+                startProcessor(new GridAffinityProcessor(ctx));
                 
startProcessor(createComponent(GridSegmentationProcessor.class, ctx));
                 
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
                 startProcessor(new GridClusterStateProcessor(ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 8f03911..adce492 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1659,6 +1659,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 if (e.getCause() instanceof ClusterTopologyCheckedException)
                     throw (ClusterTopologyCheckedException)e.getCause();
 
+                if (!ctx.discovery().alive(node))
+                    throw new ClusterTopologyCheckedException("Failed to send 
message, node left: " + node.id());
+
                 throw new IgniteCheckedException("Failed to send message (node 
may have left the grid or " +
                     "TCP connection cannot be established due to firewall 
issues) " +
                     "[node=" + node + ", topic=" + topic +

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 9bd04fa..99bc8af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -110,6 +110,9 @@ import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureRespons
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
+import 
org.apache.ignite.internal.processors.cache.mvcc.NewCoordinatorQueryAckRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -923,6 +926,21 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 return msg;
 
+            case 139:
+                msg = new TxMvccInfo();
+
+                return msg;
+
+            case 140:
+                msg = new NewCoordinatorQueryAckRequest();
+
+                return msg;
+
+            case 141:
+                msg = new MvccCounter();
+
+                return msg;
+
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 95e855a..b6cae3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -81,6 +82,9 @@ public class DiscoCache {
     /** */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    private final MvccCoordinator mvccCrd;
+
     /**
      * @param topVer Topology version.
      * @param state Current cluster state.
@@ -99,6 +103,7 @@ public class DiscoCache {
         AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
+        MvccCoordinator mvccCrd,
         List<ClusterNode> rmtNodes,
         List<ClusterNode> allNodes,
         List<ClusterNode> srvNodes,
@@ -111,6 +116,7 @@ public class DiscoCache {
         this.topVer = topVer;
         this.state = state;
         this.loc = loc;
+        this.mvccCrd = mvccCrd;
         this.rmtNodes = rmtNodes;
         this.allNodes = allNodes;
         this.srvNodes = srvNodes;
@@ -136,6 +142,13 @@ public class DiscoCache {
     }
 
     /**
+     * @return Mvcc coordinator node.
+     */
+    @Nullable public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /**
      * @return Topology version.
      */
     public AffinityTopologyVersion version() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 527399d..584df82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
@@ -616,6 +617,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                 DiscoCache discoCache = null;
 
+                ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer);
+
                 boolean locJoinEvt = type == EVT_NODE_JOINED && 
node.id().equals(locNode.id());
 
                 IgniteInternalFuture<Boolean> transitionWaitFut = null;
@@ -2261,6 +2264,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             topVer,
             state,
             loc,
+            ctx.coordinators().discoveryData().coordinator(),
             Collections.unmodifiableList(rmtNodes),
             Collections.unmodifiableList(allNodes),
             Collections.unmodifiableList(srvNodes),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index 06207d3..28dec1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.processors.affinity;
 
-import org.apache.ignite.cluster.ClusterNode;
-
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 
 /**
  * Cached affinity calculations.
@@ -85,4 +85,9 @@ public interface AffinityAssignment {
      * @return Backup partitions for specified node ID.
      */
     public Set<Integer> backupPartitions(UUID nodeId);
+
+    /**
+     * @return Mvcc coordinator.
+     */
+    public MvccCoordinator mvccCoordinator();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 35130a3..a7549cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -39,6 +40,9 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
     /** Topology version. */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    private final MvccCoordinator mvccCrd;
+
     /** Collection of calculated affinity nodes. */
     private List<List<ClusterNode>> assignment;
 
@@ -69,6 +73,7 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
         this.topVer = topVer;
         primary = new HashMap<>();
         backup = new HashMap<>();
+        mvccCrd = null;
         clientEvtChange = false;
     }
 
@@ -79,7 +84,8 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
      */
     GridAffinityAssignment(AffinityTopologyVersion topVer,
         List<List<ClusterNode>> assignment,
-        List<List<ClusterNode>> idealAssignment) {
+        List<List<ClusterNode>> idealAssignment,
+        MvccCoordinator mvccCrd) {
         assert topVer != null;
         assert assignment != null;
         assert idealAssignment != null;
@@ -87,6 +93,7 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
         this.topVer = topVer;
         this.assignment = assignment;
         this.idealAssignment = idealAssignment.equals(assignment) ? assignment 
: idealAssignment;
+        this.mvccCrd = mvccCrd;
 
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -106,6 +113,7 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
         idealAssignment = aff.idealAssignment;
         primary = aff.primary;
         backup = aff.backup;
+        mvccCrd = aff.mvccCrd;
 
         clientEvtChange = true;
     }
@@ -264,6 +272,11 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public int hashCode() {
         return topVer.hashCode();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index f921251..fb4092a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -184,10 +185,24 @@ public class GridAffinityAssignmentCache {
      * @param affAssignment Affinity assignment for topology version.
      */
     public void initialize(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> affAssignment) {
+        MvccCoordinator mvccCrd = 
ctx.cache().context().coordinators().currentCoordinatorForCacheAffinity(topVer);
+
+        initialize(topVer, affAssignment, mvccCrd);
+    }
+
+    /**
+     * Initializes affinity with given topology version and assignment.
+     *
+     * @param topVer Topology version.
+     * @param affAssignment Affinity assignment for topology version.
+     * @param mvccCrd Mvcc coordinator.
+     */
+    public void initialize(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) {
         assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + 
", last=" + lastVersion() + ']';
         assert idealAssignment != null;
+        assert mvccCrd == null || topVer.compareTo(mvccCrd.topologyVersion()) 
>= 0 : "[mvccCrd=" + mvccCrd + ", topVer=" + topVer + ']';
 
-        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, 
affAssignment, idealAssignment);
+        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, 
affAssignment, idealAssignment, mvccCrd);
 
         affCache.put(topVer, new HistoryAffinityAssignment(assignment));
         head.set(assignment);
@@ -570,7 +585,9 @@ public class GridAffinityAssignmentCache {
 
         idealAssignment(aff.idealAssignment());
 
-        initialize(aff.lastVersion(), aff.assignments(aff.lastVersion()));
+        AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion());
+
+        initialize(aff.lastVersion(), assign.assignment(), 
assign.mvccCoordinator());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 9c9fb8f..3a142c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -384,7 +384,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             try {
                 GridAffinityAssignment assign = assign0 instanceof 
GridAffinityAssignment ?
                     (GridAffinityAssignment)assign0 :
-                    new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment());
+                    new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment(), assign0.mvccCoordinator());
 
                 AffinityInfo info = new AffinityInfo(
                     cctx.config().getAffinity(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index abd5292..15d7e4e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -184,7 +184,7 @@ class GridAffinityUtils {
 
             GridAffinityAssignment assign = assign0 instanceof 
GridAffinityAssignment ?
                 (GridAffinityAssignment)assign0 :
-                new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment());
+                new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment(), assign0.mvccCoordinator());
 
             return F.t(
                 affinityMessage(ctx, cctx.config().getAffinity()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index e502dd5..d9c03e5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -43,17 +44,26 @@ public class HistoryAffinityAssignment implements 
AffinityAssignment {
     /** */
     private final boolean clientEvtChange;
 
+    /** */
+    private final MvccCoordinator mvccCrd;
+
     /**
      * @param assign Assignment.
      */
-    public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+    HistoryAffinityAssignment(GridAffinityAssignment assign) {
         this.topVer = assign.topologyVersion();
         this.assignment = assign.assignment();
         this.idealAssignment = assign.idealAssignment();
+        this.mvccCrd = assign.mvccCoordinator();
         this.clientEvtChange = assign.clientEventChange();
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean clientEventChange() {
         return clientEvtChange;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 741e204..1f9890c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -57,6 +57,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridLongList;
@@ -448,7 +449,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         if (grpHolder.client()) {
                             ClientCacheDhtTopologyFuture topFut = new 
ClientCacheDhtTopologyFuture(topVer);
 
-                            grp.topology().updateTopologyVersion(topFut, 
discoCache, -1, false);
+                            grp.topology().updateTopologyVersion(topFut,
+                                discoCache,
+                                cctx.coordinators().currentCoordinator(),
+                                -1,
+                                false);
 
                             grpHolder = new CacheGroupHolder1(grp, 
grpHolder.affinity());
 
@@ -495,6 +500,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 assert grp != null;
 
                 GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer,
+                    cctx.coordinators().currentCoordinator(),
                     null,
                     discoCache,
                     grp.affinity(),
@@ -517,7 +523,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         new ClusterTopologyServerNotFoundException("All server 
nodes left grid."));
                 }
 
-                grp.topology().updateTopologyVersion(topFut, discoCache, -1, 
false);
+                grp.topology().updateTopologyVersion(topFut,
+                    discoCache,
+                    cctx.coordinators().currentCoordinator(),
+                    -1,
+                    false);
 
                 grp.topology().update(topVer, partMap, null, 
Collections.<Integer>emptySet(), null);
 
@@ -1180,6 +1190,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             fetchFut.init(false);
 
             fetchAffinity(evts.topologyVersion(),
+                cctx.coordinators().currentCoordinator(),
                 evts.lastEvent(),
                 evts.discoveryCache(),
                 aff, fetchFut);
@@ -1528,6 +1539,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             int grpId = fetchFut.groupId();
 
             fetchAffinity(topVer,
+                cctx.coordinators().currentCoordinator(),
                 fut.events().lastEvent(),
                 fut.events().discoveryCache(),
                 cctx.cache().cacheGroup(grpId).affinity(),
@@ -1537,6 +1549,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
     /**
      * @param topVer Topology version.
+     * @param mvccCrd Mvcc coordinator to set in affinity.
      * @param discoveryEvt Discovery event.
      * @param discoCache Discovery data cache.
      * @param affCache Affinity.
@@ -1544,7 +1557,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      * @return Affinity assignment response.
      */
-    private GridDhtAffinityAssignmentResponse 
fetchAffinity(AffinityTopologyVersion topVer,
+    private GridDhtAffinityAssignmentResponse fetchAffinity(
+        AffinityTopologyVersion topVer,
+        MvccCoordinator mvccCrd,
         @Nullable DiscoveryEvent discoveryEvt,
         DiscoCache discoCache,
         GridAffinityAssignmentCache affCache,
@@ -1557,7 +1572,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         if (res == null) {
             List<List<ClusterNode>> aff = affCache.calculate(topVer, 
discoveryEvt, discoCache);
 
-            affCache.initialize(topVer, aff);
+            affCache.initialize(topVer, aff, mvccCrd);
         }
         else {
             List<List<ClusterNode>> idealAff = 
res.idealAffinityAssignment(discoCache);
@@ -1574,7 +1589,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
             assert aff != null : res;
 
-            affCache.initialize(topVer, aff);
+            affCache.initialize(topVer, aff, mvccCrd);
         }
 
         return res;
@@ -1624,7 +1639,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      * @return Future completed when caches initialization is done.
      */
-    public IgniteInternalFuture<?> initCoordinatorCaches(final 
GridDhtPartitionsExchangeFuture fut,
+    public IgniteInternalFuture<?> initCoordinatorCaches(
+        final GridDhtPartitionsExchangeFuture fut,
         final boolean newAff) throws IgniteCheckedException {
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new 
ArrayList<>();
 
@@ -1692,6 +1708,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                             @Override public void 
applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
                                 throws IgniteCheckedException {
                                 fetchAffinity(prev.topologyVersion(),
+                                    null, // Pass null mvcc coordinator, this 
affinity version should be used for queries.
                                     prev.events().lastEvent(),
                                     prev.events().discoveryCache(),
                                     aff,

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 4046c98..55ffdaf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -51,11 +55,20 @@ public class ExchangeContext {
     /** */
     private final boolean compatibilityNode = 
getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
 
+    /** */
+    private final boolean newMvccCrd;
+
+    /** Currently running mvcc queries, initialized when mvcc coordinator is 
changed. */
+    private Map<UUID, Map<MvccCounter, Integer>> activeQueries;
+
     /**
      * @param crd Coordinator flag.
+     * @param newMvccCrd {@code True} if new coordinator assigned during this 
exchange.
      * @param fut Exchange future.
      */
-    public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
+    public ExchangeContext(boolean crd, boolean newMvccCrd, 
GridDhtPartitionsExchangeFuture fut) {
+        this.newMvccCrd = newMvccCrd;
+
         int protocolVer = 
exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
 
         if (compatibilityNode || (crd && fut.localJoinExchange())) {
@@ -124,6 +137,34 @@ public class ExchangeContext {
         return merge;
     }
 
+    /**
+     * @return {@code True} if new node assigned as mvcc coordinator node 
during this exchange.
+     */
+    public boolean newMvccCoordinator() {
+        return newMvccCrd;
+    }
+
+    /**
+     * @return Active queries.
+     */
+    public Map<UUID, Map<MvccCounter, Integer>> activeQueries() {
+        return activeQueries;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param nodeQueries Node queries.
+     */
+    public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, 
Integer> nodeQueries) {
+        if (nodeQueries == null)
+            return;
+
+        if (activeQueries == null)
+            activeQueries = new HashMap<>();
+
+        activeQueries.put(nodeId, nodeQueries);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ExchangeContext.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 702b848..91e4505 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
@@ -238,6 +239,10 @@ public class GridCacheAffinityManager extends 
GridCacheManagerAdapter {
         return aff0.cachedAffinity(topVer);
     }
 
+    public MvccCoordinator mvccCoordinator(AffinityTopologyVersion topVer) {
+        return assignment(topVer).mvccCoordinator();
+    }
+
     /**
      * @param key Key to check.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index fe9ed29..097d90f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -805,7 +805,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
-    @Nullable public IgniteInternalFuture<?> 
affinityReadyFuture(AffinityTopologyVersion ver) {
+    @Nullable public IgniteInternalFuture<AffinityTopologyVersion> 
affinityReadyFuture(AffinityTopologyVersion ver) {
         GridDhtPartitionsExchangeFuture lastInitializedFut0 = 
lastInitializedFut;
 
         if (lastInitializedFut0 != null && 
lastInitializedFut0.initialVersion().compareTo(ver) == 0) {
@@ -1719,9 +1719,15 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         IgniteTxManager tm = cctx.tm();
 
         if (tm != null) {
-            U.warn(diagnosticLog, "Pending transactions:");
+            boolean first = true;
 
             for (IgniteInternalTx tx : tm.activeTransactions()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending transactions:");
+
+                    first = false;
+                }
+
                 if (exchTopVer != null) {
                     U.warn(diagnosticLog, ">>> [txVer=" + 
tx.topologyVersionSnapshot() +
                         ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) 
+
@@ -1735,31 +1741,66 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         GridCacheMvccManager mvcc = cctx.mvcc();
 
         if (mvcc != null) {
-            U.warn(diagnosticLog, "Pending explicit locks:");
+            boolean first = true;
+
+            for (GridCacheExplicitLockSpan lockSpan : 
mvcc.activeExplicitLocks()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending explicit locks:");
+
+                    first = false;
+                }
 
-            for (GridCacheExplicitLockSpan lockSpan : 
mvcc.activeExplicitLocks())
                 U.warn(diagnosticLog, ">>> " + lockSpan);
+            }
 
-            U.warn(diagnosticLog, "Pending cache futures:");
+            first = true;
+
+            for (GridCacheFuture<?> fut : mvcc.activeFutures()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending cache futures:");
+
+                    first = false;
+                }
 
-            for (GridCacheFuture<?> fut : mvcc.activeFutures())
                 dumpDiagnosticInfo(fut, diagCtx);
+            }
+
+            first = true;
+
+            for (GridCacheFuture<?> fut : mvcc.atomicFutures()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending atomic cache futures:");
 
-            U.warn(diagnosticLog, "Pending atomic cache futures:");
+                    first = false;
+                }
 
-            for (GridCacheFuture<?> fut : mvcc.atomicFutures())
                 dumpDiagnosticInfo(fut, diagCtx);
+            }
+
+            first = true;
 
-            U.warn(diagnosticLog, "Pending data streamer futures:");
+            for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending data streamer futures:");
+
+                    first = false;
+                }
 
-            for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
                 dumpDiagnosticInfo(fut, diagCtx);
+            }
 
             if (tm != null) {
-                U.warn(diagnosticLog, "Pending transaction deadlock detection 
futures:");
+                first = true;
+
+                for (IgniteInternalFuture<?> fut : 
tm.deadlockDetectionFutures()) {
+                    if (first) {
+                        U.warn(diagnosticLog, "Pending transaction deadlock 
detection futures:");
+
+                        first = false;
+                    }
 
-                for (IgniteInternalFuture<?> fut : 
tm.deadlockDetectionFutures())
                     dumpDiagnosticInfo(fut, diagCtx);
+                }
             }
         }
 
@@ -1781,6 +1822,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     affDumpCnt++;
             }
         }
+
+        cctx.kernalContext().coordinators().dumpDebugInfo(diagnosticLog, 
diagCtx);
     }
 
     /**
@@ -1949,6 +1992,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                     ClusterNode node = evt.eventNode();
 
+                    if ((evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT) &&
+                        node.equals(cctx.coordinators().currentCoordinator())) 
{
+                        if (log.isInfoEnabled())
+                            log.info("Stop merge, need exchange for mvcc 
coordinator failure: " + node);
+
+                        break;
+                    }
                     if (!curFut.context().supportsMergeExchanges(node)) {
                         if (log.isInfoEnabled())
                             log.info("Stop merge, node does not support merge: 
" + node);

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index dc24586..2af7fd8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -86,7 +86,6 @@ import 
org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import 
org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
@@ -2176,7 +2175,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     @SuppressWarnings("unchecked")
     private GridCacheSharedContext createSharedContext(GridKernalContext 
kernalCtx,
         Collection<CacheStoreSessionListener> storeSesLsnrs) throws 
IgniteCheckedException {
-        CacheCoordinatorsSharedManager coord = new 
CacheCoordinatorsSharedManager();
         IgniteTxManager tm = new IgniteTxManager();
         GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
         GridCacheVersionManager verMgr = new GridCacheVersionManager();
@@ -2215,7 +2213,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         return new GridCacheSharedContext(
             kernalCtx,
-            coord,
             tm,
             verMgr,
             mvccMgr,

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index bf5b999..f4e4d48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -36,17 +36,15 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -123,9 +121,6 @@ public class GridCacheSharedContext<K, V> {
     /** Ttl cleanup manager. */
     private GridCacheSharedTtlCleanupManager ttlMgr;
 
-    /** Cache mvcc coordinator. */
-    private CacheCoordinatorsSharedManager crd;
-
     /** Cache contexts map. */
     private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap;
 
@@ -170,7 +165,6 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @param kernalCtx  Context.
-     * @param crd Cache mvcc coordinator manager.
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
      * @param mvccMgr MVCC manager.
@@ -184,7 +178,6 @@ public class GridCacheSharedContext<K, V> {
      */
     public GridCacheSharedContext(
         GridKernalContext kernalCtx,
-        CacheCoordinatorsSharedManager crd,
         IgniteTxManager txMgr,
         GridCacheVersionManager verMgr,
         GridCacheMvccManager mvccMgr,
@@ -203,7 +196,6 @@ public class GridCacheSharedContext<K, V> {
         this.kernalCtx = kernalCtx;
 
         setManagers(mgrs,
-            crd,
             txMgr,
             jtaMgr,
             verMgr,
@@ -376,7 +368,6 @@ public class GridCacheSharedContext<K, V> {
         List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
 
         setManagers(mgrs,
-            crd,
             txMgr,
             jtaMgr,
             verMgr,
@@ -416,7 +407,6 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @param mgrs Managers list.
-     * @param coord Cache mvcc coordinator manager.
      * @param txMgr Transaction manager.
      * @param jtaMgr JTA manager.
      * @param verMgr Version manager.
@@ -428,7 +418,6 @@ public class GridCacheSharedContext<K, V> {
      * @param ttlMgr Ttl cleanup manager.
      */
     private void setManagers(List<GridCacheSharedManager<K, V>> mgrs,
-        CacheCoordinatorsSharedManager coord,
         IgniteTxManager txMgr,
         CacheJtaManagerAdapter jtaMgr,
         GridCacheVersionManager verMgr,
@@ -442,7 +431,6 @@ public class GridCacheSharedContext<K, V> {
         CacheAffinitySharedManager affMgr,
         GridCacheIoManager ioMgr,
         GridCacheSharedTtlCleanupManager ttlMgr) {
-        this.crd = add(mgrs, coord);
         this.mvccMgr = add(mgrs, mvccMgr);
         this.verMgr = add(mgrs, verMgr);
         this.txMgr = add(mgrs, txMgr);
@@ -785,8 +773,8 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @return Cache mvcc coordinator manager.
      */
-    public CacheCoordinatorsSharedManager coordinators() {
-        return crd;
+    public CacheCoordinatorsProcessor coordinators() {
+        return kernalCtx.coordinators();
     }
 
     /**
@@ -844,7 +832,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * Captures all ongoing operations that we need to wait before we can 
proceed to the next topology version.
      * This method must be called only after
-     * {@link 
GridDhtPartitionTopology#updateTopologyVersion(GridDhtTopologyFuture, 
DiscoCache, long, boolean)}
+     * {@link GridDhtPartitionTopology#updateTopologyVersion}
      * method is called so that all new updates will wait to switch to the new 
version.
      * This method will capture:
      * <ul>

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index a31f91b..77039cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -474,7 +474,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                     cctx.database().checkpointReadLock();
 
                     try {
-                        assert !txState.mvccEnabled(cctx) || mvccVer != null;
+                        assert !txState.mvccEnabled(cctx) || mvccInfo != null;
 
                         Collection<IgniteTxEntry> entries = near() ? 
allEntries() : writeEntries();
 
@@ -597,7 +597,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                         resolveTaskName(),
                                                         dhtVer,
                                                         
txEntry.updateCounter(),
-                                                        mvccVer);
+                                                        mvccInfo != null ? 
mvccInfo.version() : null);
                                                 else {
                                                     assert val != null : 
txEntry;
 
@@ -622,7 +622,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                         resolveTaskName(),
                                                         dhtVer,
                                                         
txEntry.updateCounter(),
-                                                        mvccVer);
+                                                        mvccInfo != null ? 
mvccInfo.version() : null);
 
                                                     // Keep near entry up to 
date.
                                                     if (nearCached != null) {
@@ -655,7 +655,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                     resolveTaskName(),
                                                     dhtVer,
                                                     txEntry.updateCounter(),
-                                                    mvccVer);
+                                                    mvccInfo != null ? 
mvccInfo.version() : null);
 
                                                 // Keep near entry up to date.
                                                 if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index e994113..e328c25 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -195,9 +196,15 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
         GridDhtTopologyFuture exchFut,
         DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index ee42a14..5dbb3a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1229,6 +1229,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (expVer.equals(curVer))
             return false;
 
+        // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs.
+
         Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
         Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4ae68ef..cf6554a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.jetbrains.annotations.Nullable;
 
@@ -69,6 +70,7 @@ public interface GridDhtPartitionTopology {
     public void updateTopologyVersion(
         GridDhtTopologyFuture exchFut,
         DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
         long updateSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException;
@@ -379,4 +381,6 @@ public interface GridDhtPartitionTopology {
      * @param updateRebalanceVer {@code True} if need check rebalance state.
      */
     public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, 
AffinityAssignment assignment, boolean updateRebalanceVer);
+
+    public MvccCoordinator mvccCoordinator();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 380066a..1f3d00d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -137,6 +138,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     /** */
     private volatile AffinityTopologyVersion rebalancedTopVer = 
AffinityTopologyVersion.NONE;
 
+    /** */
+    private volatile MvccCoordinator mvccCrd;
+
     /**
      * @param ctx Cache shared context.
      * @param grp Cache group.
@@ -229,9 +233,15 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
         GridDhtTopologyFuture exchFut,
         DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
@@ -255,6 +265,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             lastTopChangeVer = exchTopVer;
 
             this.discoCache = discoCache;
+            this.mvccCrd = mvccCrd;
         }
         finally {
             lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index dd00ad1..d624e2c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -295,11 +296,11 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
         GridLongList waitTxs = tx.mvccWaitTransactions();
 
         if (waitTxs != null) {
-            ClusterNode crd = 
cctx.coordinators().coordinator(tx.topologyVersion());
+            TxMvccInfo mvccInfo = tx.mvccInfo();
 
-            assert crd != null;
+            assert mvccInfo != null;
 
-            IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(crd, 
waitTxs);
+            IgniteInternalFuture fut = 
cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
 
             add(fut);
 
@@ -411,7 +412,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
         if (tx.onePhaseCommit())
             return false;
 
-        assert !commit || !tx.txState().mvccEnabled(cctx) || 
tx.mvccCoordinatorVersion() != null;
+        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != 
null;
 
         boolean sync = tx.syncMode() == FULL_SYNC;
 
@@ -469,7 +470,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                 updCntrs,
                 false,
                 false,
-                tx.mvccCoordinatorVersion());
+                tx.mvccInfo());
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : 
tx.xidVersion());
 
@@ -539,7 +540,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                     tx.activeCachesDeploymentEnabled(),
                     false,
                     false,
-                    tx.mvccCoordinatorVersion());
+                    tx.mvccInfo());
 
                 req.writeVersion(tx.writeVersion());
 
@@ -582,6 +583,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
     @SuppressWarnings("unchecked")
     @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext 
ctx) {
         if (!isDone()) {
+            // TODO IGNITE-3478 (mvcc wait txs fut)
             for (IgniteInternalFuture fut : futures()) {
                 if (!fut.isDone()) {
                     MiniFuture f = (MiniFuture)fut;
@@ -608,6 +610,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
 
     /** {@inheritDoc} */
     @Override public String toString() {
+        // TODO IGNITE-3478 (mvcc wait txs fut)
         Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
             @SuppressWarnings("unchecked")
             @Override public String apply(IgniteInternalFuture<?> f) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index df8c951..0df0b66 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -25,7 +25,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -68,7 +68,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
     private GridCacheVersion writeVer;
 
     /** */
-    private MvccCoordinatorVersion mvccVer;
+    private TxMvccInfo mvccInfo;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -126,7 +126,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
         boolean addDepInfo,
         boolean retVal,
         boolean waitRemoteTxs,
-        MvccCoordinatorVersion mvccVer
+        TxMvccInfo mvccInfo
     ) {
         super(
             xidVer,
@@ -155,7 +155,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
         this.nearNodeId = nearNodeId;
         this.isolation = isolation;
         this.miniId = miniId;
-        this.mvccVer = mvccVer;
+        this.mvccInfo = mvccInfo;
 
         needReturnValue(retVal);
         waitRemoteTransactions(waitRemoteTxs);
@@ -213,7 +213,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
         Collection<Long> updateIdxs,
         boolean retVal,
         boolean waitRemoteTxs,
-        MvccCoordinatorVersion mvccVer
+        TxMvccInfo mvccInfo
     ) {
         this(nearNodeId,
             futId,
@@ -239,7 +239,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
             addDepInfo,
             retVal,
             waitRemoteTxs,
-            mvccVer);
+            mvccInfo);
 
         if (updateIdxs != null && !updateIdxs.isEmpty()) {
             partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -252,8 +252,8 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
     /**
      * @return Counter.
      */
-    public MvccCoordinatorVersion mvccCoordinatorVersion() {
-        return mvccVer;
+    public TxMvccInfo mvccInfo() {
+        return mvccInfo;
     }
 
     /**
@@ -382,7 +382,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeMessage("mvccVer", mvccVer))
+                if (!writer.writeMessage("mvccInfo", mvccInfo))
                     return false;
 
                 writer.incrementState();
@@ -448,7 +448,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 23:
-                mvccVer = reader.readMessage("mvccVer");
+                mvccInfo = reader.readMessage("mvccInfo");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index cee5d9b..e4a7141 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -37,8 +37,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 99e1a7a..3143c4f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -61,8 +61,10 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -258,6 +260,11 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteLogger logger() {
+        return log;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
     }
@@ -872,7 +879,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
             tx.onePhaseCommit(),
             tx.activeCachesDeploymentEnabled());
 
-        res.mvccCoordinatorVersion(tx.mvccCoordinatorVersion());
+        res.mvccInfo(tx.mvccInfo());
 
         if (prepErr == null) {
             if (tx.needReturnValue() || tx.nearOnOriginatingNode() || 
tx.hasInterceptor())
@@ -1229,19 +1236,23 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 }
             }
 
-            IgniteInternalFuture<Long> waitCrdCntrFut = null;
+            IgniteInternalFuture<MvccCoordinatorVersion> waitCrdCntrFut = null;
 
             if (req.requestMvccCounter()) {
+                assert last;
+
                 assert tx.txState().mvccEnabled(cctx);
 
-                ClusterNode crd = 
cctx.coordinators().coordinator(tx.topologyVersion());
+                MvccCoordinator crd = cctx.coordinators().currentCoordinator();
 
                 assert crd != null : tx.topologyVersion();
 
-                if (crd.isLocal())
-                    
tx.mvccCoordinatorVersion(cctx.coordinators().requestTxCounterOnCoordinator(tx));
+                if (crd.nodeId().equals(cctx.localNodeId()))
+                    onMvccResponse(cctx.localNodeId(), 
cctx.coordinators().requestTxCounterOnCoordinator(tx));
                 else {
-                    IgniteInternalFuture<Long> crdCntrFut = 
cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
+                    IgniteInternalFuture<MvccCoordinatorVersion> crdCntrFut = 
cctx.coordinators().requestTxCounter(crd,
+                        this,
+                        tx.nearXidVersion());
 
                     if (tx.onePhaseCommit())
                         waitCrdCntrFut = crdCntrFut;
@@ -1271,23 +1282,23 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 if (waitCrdCntrFut != null) {
                     skipInit = true;
 
-                    waitCrdCntrFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> 
fut) {
+                    waitCrdCntrFut.listen(new 
IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() {
+                        @Override public void 
apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) {
                             try {
                                 fut.get();
 
                                 sendPrepareRequests();
+
+                                markInitialized();
                             }
                             catch (Throwable e) {
-                                U.error(log, "Failed to get coordinator 
counter: " + e, e);
+                                U.error(log, "Failed to get mvcc version for 
tx [txId=" + tx.nearXidVersion() +
+                                    ", err=" + e + ']', e);
 
                                 GridNearTxPrepareResponse res = 
createPrepareResponse(e);
 
                                 onDone(res, res.error());
                             }
-                            finally {
-                                markInitialized();
-                            }
                         }
                     });
                 }
@@ -1302,8 +1313,8 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
     }
 
     /** {@inheritDoc} */
-    @Override public void onMvccResponse(MvccCoordinatorVersion res) {
-        tx.mvccCoordinatorVersion(res);
+    @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion 
res) {
+        tx.mvccInfo(new TxMvccInfo(crdId, res));
     }
 
     /** {@inheritDoc} */
@@ -1325,7 +1336,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
             }
         }
 
-        assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || 
tx.mvccCoordinatorVersion() != null;
+        assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || 
tx.mvccInfo() != null;
 
         int miniId = 0;
 
@@ -1376,7 +1387,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 tx.activeCachesDeploymentEnabled(),
                 tx.storeWriteThrough(),
                 retVal,
-                tx.mvccCoordinatorVersion());
+                tx.mvccInfo());
 
             int idx = 0;
 
@@ -1490,7 +1501,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                     tx.activeCachesDeploymentEnabled(),
                     tx.storeWriteThrough(),
                     retVal,
-                    tx.mvccCoordinatorVersion());
+                    tx.mvccInfo());
 
                 for (IgniteTxEntry entry : nearMapping.entries()) {
                     if (CU.writes().apply(entry)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index da7f831..e099a32 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -34,7 +34,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -105,7 +105,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
     private List<IgniteTxKey> nearWritesCacheMissed;
 
     /** */
-    private MvccCoordinatorVersion mvccVer;
+    private TxMvccInfo mvccInfo;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -146,7 +146,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
         boolean addDepInfo,
         boolean storeWriteThrough,
         boolean retVal,
-        MvccCoordinatorVersion mvccVer) {
+        TxMvccInfo mvccInfo) {
         super(tx,
             timeout,
             null,
@@ -175,14 +175,14 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
 
         nearNodeId = tx.nearNodeId();
 
-        this.mvccVer = mvccVer;
+        this.mvccInfo = mvccInfo;
     }
 
     /**
-     * @return Counter.
+     * @return Mvcc info.
      */
-    public MvccCoordinatorVersion mvccCoordinatorVersion() {
-        return mvccVer;
+    public TxMvccInfo mvccInfo() {
+        return mvccInfo;
     }
 
     /**
@@ -421,7 +421,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeMessage("mvccVer", mvccVer))
+                if (!writer.writeMessage("mvccInfo", mvccInfo))
                     return false;
 
                 writer.incrementState();
@@ -521,7 +521,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 23:
-                mvccVer = reader.readMessage("mvccVer");
+                mvccInfo = reader.readMessage("mvccInfo");
 
                 if (!reader.isLastRead())
                     return false;

Reply via email to