ignite-3479

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0196b00
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0196b00
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0196b00

Branch: refs/heads/ignite-3479
Commit: e0196b003f628eeaf540ebec9760bc6dcafaee4c
Parents: 4e7f19e
Author: sboikov <[email protected]>
Authored: Wed Sep 27 17:52:18 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Sep 27 17:58:34 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContext.java      |    4 +-
 .../ignite/internal/GridKernalContextImpl.java  |   10 +-
 .../apache/ignite/internal/IgniteKernal.java    |    4 +-
 .../processors/cache/GridCacheProcessor.java    |    1 -
 .../cache/GridCacheSharedContext.java           |    4 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |    2 -
 .../GridNearPessimisticTxPrepareFuture.java     |    8 +-
 .../mvcc/CacheCoordinatorsDiscoveryData.java    |   42 +
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 1120 ++++++++++++++++++
 .../mvcc/CacheCoordinatorsSharedManager.java    | 1100 -----------------
 .../wal/reader/StandaloneGridKernalContext.java |    4 +-
 .../cache/tree/AbstractDataInnerIO.java         |    6 +-
 .../cache/tree/AbstractDataLeafIO.java          |    6 +-
 .../processors/cache/tree/CacheDataTree.java    |    4 +-
 .../cache/tree/CacheIdAwareDataInnerIO.java     |    4 +-
 .../cache/tree/CacheIdAwareDataLeafIO.java      |    4 +-
 .../processors/cache/tree/DataInnerIO.java      |    4 +-
 .../processors/cache/tree/DataLeafIO.java       |    4 +-
 .../processors/cache/tree/MvccDataRow.java      |    4 +-
 .../processors/cache/tree/SearchRow.java        |    4 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |    4 +-
 21 files changed, 1201 insertions(+), 1142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 971be7e..88251aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -34,7 +34,7 @@ import 
org.apache.ignite.internal.managers.indexing.GridIndexingManager;
 import 
org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -648,5 +648,5 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     /**
      * @return Cache mvcc coordinator processor.
      */
-    public CacheCoordinatorsSharedManager coordinators();
+    public CacheCoordinatorsProcessor coordinators();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1715887..86c0adc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -49,7 +49,7 @@ import 
org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -285,7 +285,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** Cache mvcc coordinators. */
     @GridToStringExclude
-    private CacheCoordinatorsSharedManager coordProc;
+    private CacheCoordinatorsProcessor coordProc;
 
     /** */
     @GridToStringExclude
@@ -584,8 +584,8 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
             poolProc = (PoolProcessor) comp;
         else if (comp instanceof GridMarshallerMappingProcessor)
             mappingProc = (GridMarshallerMappingProcessor)comp;
-        else if (comp instanceof CacheCoordinatorsSharedManager)
-            coordProc = (CacheCoordinatorsSharedManager)comp;
+        else if (comp instanceof CacheCoordinatorsProcessor)
+            coordProc = (CacheCoordinatorsProcessor)comp;
         else if (!(comp instanceof DiscoveryNodeValidationProcessor
                 || comp instanceof PlatformPluginProcessor))
             assert (comp instanceof GridPluginComponent) : "Unknown manager 
class: " + comp.getClass();
@@ -841,7 +841,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public CacheCoordinatorsSharedManager coordinators() {
+    @Override public CacheCoordinatorsProcessor coordinators() {
         return coordProc;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 7b833bc..9a6972b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -114,7 +114,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
@@ -938,7 +938,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             // Start processors before discovery manager, so they will
             // be able to start receiving messages once discovery completes.
             try {
-                startProcessor(new CacheCoordinatorsSharedManager(ctx));
+                startProcessor(new CacheCoordinatorsProcessor(ctx));
                 
startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
                 startProcessor(new GridAffinityProcessor(ctx));
                 
startProcessor(createComponent(GridSegmentationProcessor.class, ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e52c56c..2af7fd8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -86,7 +86,6 @@ import 
org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import 
org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 1cdee39..f4e4d48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -44,7 +44,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
@@ -773,7 +773,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @return Cache mvcc coordinator manager.
      */
-    public CacheCoordinatorsSharedManager coordinators() {
+    public CacheCoordinatorsProcessor coordinators() {
         return kernalCtx.coordinators();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index cee5d9b..e4a7141 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -37,8 +37,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index c6192d9..0664b1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -36,7 +36,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
@@ -490,9 +490,9 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
                         ", loc=" + ((MiniFuture)f).primary().isLocal() +
                         ", done=" + f.isDone() + "]";
                 }
-                else if (f instanceof 
CacheCoordinatorsSharedManager.MvccVersionFuture) {
-                    CacheCoordinatorsSharedManager.MvccVersionFuture crdFut =
-                        (CacheCoordinatorsSharedManager.MvccVersionFuture)f;
+                else if (f instanceof 
CacheCoordinatorsProcessor.MvccVersionFuture) {
+                    CacheCoordinatorsProcessor.MvccVersionFuture crdFut =
+                        (CacheCoordinatorsProcessor.MvccVersionFuture)f;
 
                     return "[mvccCrdNode=" + crdFut.crdId +
                         ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) +

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
new file mode 100644
index 0000000..39baec9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public class CacheCoordinatorsDiscoveryData implements Serializable {
+    /** */
+    private MvccCoordinator crd;
+
+    /**
+     * @param crd Coordinator.
+     */
+    public CacheCoordinatorsDiscoveryData(MvccCoordinator crd) {
+        this.crd = crd;
+    }
+
+    /**
+     * @return Current coordinator.
+     */
+    public MvccCoordinator coordinator() {
+        return crd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
new file mode 100644
index 0000000..5f5da20
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -0,0 +1,1120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.LongAdder8;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ *
+ */
+public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
+    /** */
+    public static final long COUNTER_NA = 0L;
+
+    /** */
+    private static final boolean STAT_CNTRS = false;
+
+    /** */
+    private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR;
+
+    /** */
+    private static final byte MSG_POLICY = SYSTEM_POOL;
+    
+    /** */
+    private volatile MvccCoordinator curCrd;
+
+    /** */
+    private final AtomicLong mvccCntr = new AtomicLong(1L);
+
+    /** */
+    private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
+
+    /** */
+    private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = 
new ConcurrentSkipListMap<>();
+
+    /** */
+    private final ConcurrentMap<Long, AtomicInteger> activeQueries = new 
ConcurrentHashMap<>();
+
+    /** */
+    private final PreviousCoordinatorQueries prevCrdQueries = new 
PreviousCoordinatorQueries();
+
+    /** */
+    private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new 
ConcurrentHashMap<>();
+
+    /** */
+    private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new 
ConcurrentHashMap<>();
+
+    /** */
+    private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new 
ConcurrentHashMap<>();
+
+    /** */
+    private final AtomicLong futIdCntr = new AtomicLong();
+
+    /** */
+    private final CountDownLatch crdLatch = new CountDownLatch(1);
+
+    /** Topology version when local node was assigned as coordinator. */
+    private long crdVer;
+
+    /** */
+    private StatCounter[] statCntrs;
+
+    /**
+     * @param ver1 First version.
+     * @param ver2 Second version.
+     * @return Comparison result.
+     */
+    public static int compareVersions(MvccCoordinatorVersion ver1, 
MvccCoordinatorVersion ver2) {
+        assert ver1 != null;
+        assert ver2 != null;
+
+        int cmp = Long.compare(ver1.coordinatorVersion(), 
ver2.coordinatorVersion());
+
+        if (cmp != 0)
+            return cmp;
+
+        return Long.compare(ver1.counter(), ver2.counter());
+    }
+
+    /** */
+    private CacheCoordinatorsDiscoveryData discoData;
+
+    /**
+     * @param ctx Context.
+     */
+    public CacheCoordinatorsProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return DiscoveryDataExchangeType.CACHE_CRD_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        Integer cmpId = DiscoveryDataExchangeType.CACHE_CRD_PROC.ordinal();
+
+        if (!dataBag.commonDataCollectedFor(cmpId))
+            dataBag.addGridCommonData(cmpId, discoData);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        discoData = (CacheCoordinatorsDiscoveryData)data.commonData();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        statCntrs = new StatCounter[7];
+
+        statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", 
"avgTxs");
+        statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", 
"avgFutTime");
+        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+        statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", 
"avgFutTime");
+        statCntrs[4] = new StatCounter("TotalRequests");
+        statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
+        statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", 
"avgFutTime");
+
+        ctx.event().addLocalEventListener(new 
CacheCoordinatorNodeFailListener(),
+            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        ctx.io().addMessageListener(MSG_TOPIC, new 
CoordinatorMessageListener());
+    }
+    
+    /**
+     * @param log Logger.
+     */
+    public void dumpStatistics(IgniteLogger log) {
+        if (STAT_CNTRS) {
+            log.info("Mvcc coordinator statistics: ");
+
+            for (StatCounter cntr : statCntrs)
+                cntr.dumpInfo(log);
+        }
+    }
+
+    /**
+     * @param tx Transaction.
+     * @return Counter.
+     */
+    public MvccCoordinatorVersion 
requestTxCounterOnCoordinator(IgniteInternalTx tx) {
+        assert ctx.localNodeId().equals(currentCoordinatorId());
+
+        return assignTxCounter(tx.nearXidVersion(), 0L);
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @param lsnr Response listener.
+     * @param txVer Transaction version.
+     * @return Counter request future.
+     */
+    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestTxCounter(MvccCoordinator crd,
+        MvccResponseListener lsnr,
+        GridCacheVersion txVer) {
+        assert !ctx.localNodeId().equals(crd.nodeId());
+
+        MvccVersionFuture fut = new 
MvccVersionFuture(futIdCntr.incrementAndGet(),
+            crd.nodeId(),
+            lsnr);
+
+        verFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crd.nodeId(),
+                MSG_TOPIC,
+                new CoordinatorTxCounterRequest(fut.id, txVer),
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            fut.onError(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @param mvccVer Query version.
+     */
+    public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion 
mvccVer) {
+        assert crd != null;
+
+        long trackCntr = mvccVer.counter();
+
+        MvccLongList txs = mvccVer.activeTransactions();
+
+        if (txs != null) {
+            for (int i = 0; i < txs.size(); i++) {
+                long txId = txs.get(i);
+
+                if (txId < trackCntr)
+                    trackCntr = txId;
+            }
+        }
+
+        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() 
? new CoordinatorQueryAckRequest(trackCntr) :
+            new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), 
trackCntr);
+
+        try {
+            ctx.io().sendToGridTopic(crd.nodeId(),
+                MSG_TOPIC,
+                msg,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send query ack, node left [crd=" + crd + 
", msg=" + msg + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + 
msg + ']', e);
+        }
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @return Counter request future.
+     */
+    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestQueryCounter(MvccCoordinator crd) {
+        assert crd != null;
+
+        // TODO IGNITE-3478: special case for local?
+        MvccVersionFuture fut = new 
MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null);
+
+        verFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crd.nodeId(),
+                MSG_TOPIC,
+                new CoordinatorQueryVersionRequest(fut.id),
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            if (verFuts.remove(fut.id) != null)
+                fut.onDone(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crdId Coordinator ID.
+     * @param txs Transaction IDs.
+     * @return Future.
+     */
+    public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList 
txs) {
+        assert crdId != null;
+        assert txs != null && txs.size() > 0;
+
+        // TODO IGNITE-3478: special case for local?
+
+        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crdId, false);
+
+        ackFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crdId,
+                MSG_TOPIC,
+                new CoordinatorWaitTxsRequest(fut.id, txs),
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (ackFuts.remove(fut.id) != null)
+                fut.onDone(); // No need to ack, finish without error.
+        }
+        catch (IgniteCheckedException e) {
+            if (ackFuts.remove(fut.id) != null)
+                fut.onDone(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @param mvccVer Transaction version.
+     * @return Acknowledge future.
+     */
+    public IgniteInternalFuture<Void> ackTxCommit(UUID crd, 
MvccCoordinatorVersion mvccVer) {
+        assert crd != null;
+        assert mvccVer != null;
+
+        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crd, true);
+
+        ackFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crd,
+                MSG_TOPIC,
+                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (ackFuts.remove(fut.id) != null)
+                fut.onDone(); // No need to ack, finish without error.
+        }
+        catch (IgniteCheckedException e) {
+            if (ackFuts.remove(fut.id) != null)
+                fut.onDone(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crdId Coordinator node ID.
+     * @param mvccVer Transaction version.
+     */
+    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
+        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, 
mvccVer.counter());
+
+        msg.skipResponse(true);
+
+        try {
+            ctx.io().sendToGridTopic(crdId,
+                MSG_TOPIC,
+                msg,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send tx rollback ack, node left [msg=" + 
msg + ", node=" + crdId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", 
node=" + crdId + ']', e);
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorTxCounterRequest(UUID nodeId, 
CoordinatorTxCounterRequest msg) {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignore tx counter request processing, node left 
[msg=" + msg + ", node=" + nodeId + ']');
+
+            return;
+        }
+
+        MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), 
msg.futureId());
+
+        if (STAT_CNTRS)
+            statCntrs[0].update(res.size());
+
+        try {
+            ctx.io().sendToGridTopic(node,
+                MSG_TOPIC,
+                res,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send tx counter response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send tx counter response [msg=" + msg + ", 
node=" + nodeId + ']', e);
+        }
+    }
+
+    /**
+     *
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorQueryVersionRequest(UUID nodeId, 
CoordinatorQueryVersionRequest msg) {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignore query counter request processing, node left 
[msg=" + msg + ", node=" + nodeId + ']');
+
+            return;
+        }
+
+        MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, 
msg.futureId());
+
+        try {
+            ctx.io().sendToGridTopic(node,
+                MSG_TOPIC,
+                res,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send query counter response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
+
+            onNodeFailed(nodeId);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send query counter response [msg=" + msg + 
", node=" + nodeId + ']', e);
+
+            onQueryDone(res.counter());
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorVersionResponse(UUID nodeId, 
MvccCoordinatorVersionResponse msg) {
+        MvccVersionFuture fut = verFuts.remove(msg.futureId());
+
+        if (fut != null) {
+            if (STAT_CNTRS)
+                statCntrs[1].update((System.nanoTime() - fut.startTime) * 
1000);
+
+            fut.onResponse(msg);
+        }
+        else {
+            if (ctx.discovery().alive(nodeId))
+                U.warn(log, "Failed to find query version future [node=" + 
nodeId + ", msg=" + msg + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Failed to find query version future [node=" + 
nodeId + ", msg=" + msg + ']');
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest 
msg) {
+        onQueryDone(msg.counter());
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void processNewCoordinatorQueryAckRequest(UUID nodeId, 
NewCoordinatorQueryAckRequest msg) {
+        prevCrdQueries.onQueryDone(nodeId, msg);
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorTxAckRequest(UUID nodeId, 
CoordinatorTxAckRequest msg) {
+        onTxDone(msg.txCounter());
+
+        if (STAT_CNTRS)
+            statCntrs[2].update();
+
+        if (!msg.skipResponse()) {
+            try {
+                ctx.io().sendToGridTopic(nodeId,
+                    MSG_TOPIC,
+                    new CoordinatorFutureResponse(msg.futureId()),
+                    MSG_POLICY);
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send tx ack response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send tx ack response [msg=" + msg + ", 
node=" + nodeId + ']', e);
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorAckResponse(UUID nodeId, 
CoordinatorFutureResponse msg) {
+        WaitAckFuture fut = ackFuts.remove(msg.futureId());
+
+        if (fut != null) {
+            if (STAT_CNTRS) {
+                StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6];
+
+                cntr.update((System.nanoTime() - fut.startTime) * 1000);
+            }
+
+            fut.onResponse();
+        }
+        else {
+            if (ctx.discovery().alive(nodeId))
+                U.warn(log, "Failed to find tx ack future [node=" + nodeId + 
", msg=" + msg + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Failed to find tx ack future [node=" + nodeId + ", 
msg=" + msg + ']');
+        }
+    }
+
+    /**
+     * @param txId Transaction ID.
+     * @return Counter.
+     */
+    private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion 
txId, long futId) {
+        assert crdVer != 0;
+
+        long nextCtr = mvccCntr.incrementAndGet();
+
+        // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
+        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
+
+        for (Long txVer : activeTxs.keySet())
+            res.addTx(txVer);
+
+        Object old = activeTxs.put(nextCtr, txId);
+
+        assert old == null : txId;
+
+        long cleanupVer;
+
+        if (prevCrdQueries.previousQueriesDone()) {
+            cleanupVer = committedCntr.get() - 1;
+
+            for (Long qryVer : activeQueries.keySet()) {
+                if (qryVer <= cleanupVer)
+                    cleanupVer = qryVer - 1;
+            }
+        }
+        else
+            cleanupVer = -1;
+
+        res.init(futId, crdVer, nextCtr, cleanupVer);
+
+        return res;
+    }
+
+    /**
+     * @param txCntr Counter assigned to transaction.
+     */
+    private void onTxDone(Long txCntr) {
+        GridFutureAdapter fut; // TODO IGNITE-3478.
+
+        GridCacheVersion ver = activeTxs.remove(txCntr);
+
+        assert ver != null;
+
+        committedCntr.setIfGreater(txCntr);
+
+        fut = waitTxFuts.remove(txCntr);
+
+        if (fut != null)
+            fut.onDone();
+    }
+
+    static boolean increment(AtomicInteger cntr) {
+        for (;;) {
+            int current = cntr.get();
+
+            if (current == 0)
+                return false;
+
+            if (cntr.compareAndSet(current, current + 1))
+                return true;
+        }
+    }
+
+    /**
+     * @param qryNodeId Node initiated query.
+     * @return Counter for query.
+     */
+    private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, 
long futId) {
+        assert crdVer != 0;
+
+        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
+
+        Long mvccCntr;
+
+        for(;;) {
+            mvccCntr = committedCntr.get();
+
+            Long trackCntr = mvccCntr;
+
+            for (Long txVer : activeTxs.keySet()) {
+                if (txVer < trackCntr)
+                    trackCntr = txVer;
+
+                res.addTx(txVer);
+            }
+
+            registerActiveQuery(trackCntr);
+
+            if (committedCntr.get() == mvccCntr)
+                break;
+            else {
+                res.resetTransactionsCount();
+
+                onQueryDone(trackCntr);
+            }
+        }
+
+        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+
+        return res;
+    }
+
+    private void registerActiveQuery(Long cntr) {
+        for (;;) {
+            AtomicInteger qryCnt = activeQueries.get(cntr);
+
+            if (qryCnt != null) {
+                boolean inc = increment(qryCnt);
+
+                if (!inc) {
+                    activeQueries.remove(mvccCntr, qryCnt);
+
+                    continue;
+                }
+            }
+            else {
+                qryCnt = new AtomicInteger(1);
+
+                if (activeQueries.putIfAbsent(cntr, qryCnt) != null)
+                    continue;
+            }
+
+            break;
+        }
+    }
+
+    private void onNodeFailed(UUID nodeId) {
+        // TODO
+    }
+
+    /**
+     * @param mvccCntr Query counter.
+     */
+    private void onQueryDone(long mvccCntr) {
+        AtomicInteger cntr = activeQueries.get(mvccCntr);
+
+        assert cntr != null : mvccCntr;
+
+        int left = cntr.decrementAndGet();
+
+        assert left >= 0 : left;
+
+        if (left == 0) {
+            boolean rmv = activeQueries.remove(mvccCntr, cntr);
+
+            assert rmv;
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processCoordinatorWaitTxsRequest(final UUID nodeId, final 
CoordinatorWaitTxsRequest msg) {
+        statCntrs[5].update();
+
+        GridLongList txs = msg.transactions();
+
+        GridCompoundFuture resFut = null;
+
+        for (int i = 0; i < txs.size(); i++) {
+            Long txId = txs.get(i);
+
+            WaitTxFuture fut = waitTxFuts.get(txId);
+
+            if (fut == null) {
+                WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new 
WaitTxFuture(txId));
+
+                if (old != null)
+                    fut = old;
+            }
+
+            if (!activeTxs.containsKey(txId))
+                fut.onDone();
+
+            if (!fut.isDone()) {
+                if (resFut == null)
+                    resFut = new GridCompoundFuture();
+
+                resFut.add(fut);
+            }
+        }
+
+        if (resFut != null)
+            resFut.markInitialized();
+
+        if (resFut == null || resFut.isDone())
+            sendFutureResponse(nodeId, msg);
+        else {
+            resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    sendFutureResponse(nodeId, msg);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param nodeId
+     * @param msg
+     */
+    private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest 
msg) {
+        try {
+            ctx.io().sendToGridTopic(nodeId,
+                MSG_TOPIC,
+                new CoordinatorFutureResponse(msg.futureId()),
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send tx ack response, node left [msg=" + 
msg + ", node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send tx ack response [msg=" + msg + ", 
node=" + nodeId + ']', e);
+        }
+    }
+
+    public MvccCoordinator currentCoordinator() {
+        return curCrd;
+    }
+
+    public UUID currentCoordinatorId() {
+        MvccCoordinator curCrd = this.curCrd;
+
+        return curCrd != null ? curCrd.nodeId() : null;
+    }
+
+    /**
+     * @param topVer Cache affinity version (used for assert).
+     * @return Coordinator.
+     */
+    public MvccCoordinator 
currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) {
+        MvccCoordinator crd = curCrd;
+
+        // Assert coordinator did not already change.
+        assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 :
+            "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']';
+
+        return crd;
+    }
+
+    /**
+     * @param discoCache Discovery snapshot.
+     * @return New coordinator.
+     */
+    public MvccCoordinator reassignCoordinator(DiscoCache discoCache) {
+        assert curCrd == null || 
!F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd;
+
+        if (!discoCache.serverNodes().isEmpty()) {
+            ClusterNode node = discoCache.serverNodes().get(0);
+
+            curCrd = new MvccCoordinator(node.id(),
+                discoCache.version().topologyVersion(),
+                discoCache.version());
+
+            log.info("Assigned mvcc coordinator: " + curCrd);
+        }
+        else {
+            curCrd = null;
+
+            log.info("New mvcc coordinator was not assigned [topVer=" + 
discoCache.version() + ']');
+        }
+
+        return curCrd;
+    }
+
+    /**
+     * @param nodeId Node ID
+     * @param activeQueries
+     */
+    public void processClientActiveQueries(UUID nodeId,
+        @Nullable Map<MvccCounter, Integer> activeQueries) {
+        prevCrdQueries.processClientActiveQueries(nodeId, activeQueries);
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param activeQueries Current queries.
+     */
+    public void initCoordinator(AffinityTopologyVersion topVer,
+        DiscoCache discoCache,
+        Map<UUID, Map<MvccCounter, Integer>> activeQueries)
+    {
+        assert ctx.localNodeId().equals(curCrd.nodeId());
+
+        log.info("Initialize local node as mvcc coordinator [node=" + 
ctx.localNodeId() +
+            ", topVer=" + topVer + ']');
+
+        crdVer = topVer.topologyVersion();
+
+        prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
+
+        crdLatch.countDown();
+    }
+
+    /**
+     *
+     */
+    public class MvccVersionFuture extends 
GridFutureAdapter<MvccCoordinatorVersion> {
+        /** */
+        private final Long id;
+
+        /** */
+        private MvccResponseListener lsnr;
+
+        /** */
+        public final UUID crdId;
+
+        /** */
+        long startTime;
+
+        /**
+         * @param id Future ID.
+         * @param crdId Coordinator node ID.
+         */
+        MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener 
lsnr) {
+            this.id = id;
+            this.crdId = crdId;
+            this.lsnr = lsnr;
+
+            if (STAT_CNTRS)
+                startTime = System.nanoTime();
+        }
+
+        /**
+         * @param res Response.
+         */
+        void onResponse(MvccCoordinatorVersionResponse res) {
+            assert res.counter() != COUNTER_NA;
+
+            if (lsnr != null)
+                lsnr.onMvccResponse(crdId, res);
+
+            onDone(res);
+        }
+
+        void onError(IgniteCheckedException err) {
+            if (verFuts.remove(id) != null) {
+                if (lsnr != null)
+                    lsnr.onMvccError(err);
+
+                onDone(err);
+            }
+        }
+
+        /**
+         * @param nodeId Failed node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            if (crdId.equals(nodeId)) {
+                ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException("Failed to request coordinator version, " +
+                    "coordinator failed: " + nodeId);
+
+                onError(err);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']';
+        }
+    }
+
+    /**
+     *
+     */
+    private class WaitAckFuture extends GridFutureAdapter<Void> {
+        /** */
+        private final long id;
+
+        /** */
+        private final UUID crdId;
+
+        /** */
+        long startTime;
+
+        /** */
+        final boolean ackTx;
+
+        /**
+         * @param id Future ID.
+         * @param crdId Coordinator node ID.
+         */
+        WaitAckFuture(long id, UUID crdId, boolean ackTx) {
+            this.id = id;
+            this.crdId = crdId;
+            this.ackTx = ackTx;
+
+            if (STAT_CNTRS)
+                startTime = System.nanoTime();
+        }
+
+        /**
+         *
+         */
+        void onResponse() {
+            onDone();
+        }
+
+        /**
+         * @param nodeId Failed node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            if (crdId.equals(nodeId) && verFuts.remove(id) != null)
+                onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "WaitAckFuture [crdId=" + crdId + ", id=" + id + ']';
+        }
+    }
+
+    /**
+     *
+     */
+    private class CacheCoordinatorNodeFailListener implements 
GridLocalEventListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(Event evt) {
+            assert evt instanceof DiscoveryEvent : evt;
+
+            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+            UUID nodeId = discoEvt.eventNode().id();
+
+            for (MvccVersionFuture fut : verFuts.values())
+                fut.onNodeLeft(nodeId);
+
+            for (WaitAckFuture fut : ackFuts.values())
+                fut.onNodeLeft(nodeId);
+
+            prevCrdQueries.onNodeLeft(nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CacheCoordinatorDiscoveryListener[]";
+        }
+    }
+    /**
+     *
+     */
+    private class CoordinatorMessageListener implements GridMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (STAT_CNTRS)
+                statCntrs[4].update();
+
+            MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg;
+
+            if (msg0.waitForCoordinatorInit()) {
+                if (crdVer == 0) {
+                    try {
+                        U.await(crdLatch);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        U.warn(log, "Failed to wait for coordinator 
initialization, thread interrupted [" +
+                            "msgNode=" + nodeId + ", msg=" + msg + ']');
+
+                        return;
+                    }
+
+                    assert crdVer != 0L;
+                }
+            }
+
+            if (msg instanceof CoordinatorTxCounterRequest)
+                processCoordinatorTxCounterRequest(nodeId, 
(CoordinatorTxCounterRequest)msg);
+            else if (msg instanceof CoordinatorTxAckRequest)
+                processCoordinatorTxAckRequest(nodeId, 
(CoordinatorTxAckRequest)msg);
+            else if (msg instanceof CoordinatorFutureResponse)
+                processCoordinatorAckResponse(nodeId, 
(CoordinatorFutureResponse)msg);
+            else if (msg instanceof CoordinatorQueryAckRequest)
+                
processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
+            else if (msg instanceof CoordinatorQueryVersionRequest)
+                processCoordinatorQueryVersionRequest(nodeId, 
(CoordinatorQueryVersionRequest)msg);
+            else if (msg instanceof MvccCoordinatorVersionResponse)
+                processCoordinatorVersionResponse(nodeId, 
(MvccCoordinatorVersionResponse) msg);
+            else if (msg instanceof CoordinatorWaitTxsRequest)
+                processCoordinatorWaitTxsRequest(nodeId, 
(CoordinatorWaitTxsRequest)msg);
+            else if (msg instanceof NewCoordinatorQueryAckRequest)
+                processNewCoordinatorQueryAckRequest(nodeId, 
(NewCoordinatorQueryAckRequest)msg);
+            else
+                U.warn(log, "Unexpected message received [node=" + nodeId + ", 
msg=" + msg + ']');
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CoordinatorMessageListener[]";
+        }
+    }
+    /**
+     *
+     */
+    static class StatCounter {
+        /** */
+        final String name;
+
+        /** */
+        final LongAdder8 cntr = new LongAdder8();
+
+        public StatCounter(String name) {
+            this.name = name;
+        }
+
+        void update() {
+            cntr.increment();
+        }
+
+        void update(GridLongList arg) {
+            throw new UnsupportedOperationException();
+        }
+
+        void update(long arg) {
+            throw new UnsupportedOperationException();
+        }
+
+        void dumpInfo(IgniteLogger log) {
+            long totalCnt = cntr.sumThenReset();
+
+            if (totalCnt > 0)
+                log.info(name + " [cnt=" + totalCnt + ']');
+        }
+    }
+
+    /**
+     *
+     */
+    static class CounterWithAvg extends StatCounter {
+        /** */
+        final LongAdder8 total = new LongAdder8();
+
+        /** */
+        final String avgName;
+
+        CounterWithAvg(String name, String avgName) {
+            super(name);
+
+            this.avgName = avgName;
+        }
+
+        @Override void update(GridLongList arg) {
+            update(arg != null ? arg.size() : 0);
+        }
+
+        @Override void update(long add) {
+            cntr.increment();
+
+            total.add(add);
+        }
+
+        void dumpInfo(IgniteLogger log) {
+            long totalCnt = cntr.sumThenReset();
+            long totalSum = total.sumThenReset();
+
+            if (totalCnt > 0)
+                log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + 
((float)totalSum / totalCnt) + ']');
+        }
+    }
+
+    /**
+     *
+     */
+    private static class WaitTxFuture extends GridFutureAdapter {
+        /** */
+        private final long txId;
+
+        /**
+         * @param txId Transaction ID.
+         */
+        WaitTxFuture(long txId) {
+            this.txId = txId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
deleted file mode 100644
index 73febc0..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ /dev/null
@@ -1,1100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridComponent;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridAtomicLong;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.LongAdder8;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
-import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-
-/**
- *
- */
-public class CacheCoordinatorsSharedManager extends GridProcessorAdapter {
-    /** */
-    public static final long COUNTER_NA = 0L;
-
-    /** */
-    private static final boolean STAT_CNTRS = false;
-
-    /** */
-    private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR;
-
-    /** */
-    private static final byte MSG_POLICY = SYSTEM_POOL;
-    
-    /** */
-    private volatile MvccCoordinator curCrd;
-
-    /** */
-    private final AtomicLong mvccCntr = new AtomicLong(1L);
-
-    /** */
-    private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
-
-    /** */
-    private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = 
new ConcurrentSkipListMap<>();
-
-    /** */
-    private final ConcurrentMap<Long, AtomicInteger> activeQueries = new 
ConcurrentHashMap<>();
-
-    /** */
-    private final PreviousCoordinatorQueries prevCrdQueries = new 
PreviousCoordinatorQueries();
-
-    /** */
-    private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new 
ConcurrentHashMap<>();
-
-    /** */
-    private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new 
ConcurrentHashMap<>();
-
-    /** */
-    private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new 
ConcurrentHashMap<>();
-
-    /** */
-    private final AtomicLong futIdCntr = new AtomicLong();
-
-    /** */
-    private final CountDownLatch crdLatch = new CountDownLatch(1);
-
-    /** Topology version when local node was assigned as coordinator. */
-    private long crdVer;
-
-    /** */
-    private StatCounter[] statCntrs;
-
-    /**
-     * @param ver1 First version.
-     * @param ver2 Second version.
-     * @return
-     */
-    public static int compareVersions(MvccCoordinatorVersion ver1, 
MvccCoordinatorVersion ver2) {
-        assert ver1 != null;
-        assert ver2 != null;
-
-        int cmp = Long.compare(ver1.coordinatorVersion(), 
ver2.coordinatorVersion());
-
-        if (cmp != 0)
-            return cmp;
-
-        return Long.compare(ver1.counter(), ver2.counter());
-    }
-
-    public CacheCoordinatorsSharedManager(GridKernalContext ctx) {
-        super(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.CACHE_CRD_PROC;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        statCntrs = new StatCounter[7];
-
-        statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", 
"avgTxs");
-        statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", 
"avgFutTime");
-        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
-        statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", 
"avgFutTime");
-        statCntrs[4] = new StatCounter("TotalRequests");
-        statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
-        statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", 
"avgFutTime");
-
-        ctx.event().addLocalEventListener(new 
CacheCoordinatorNodeFailListener(),
-            EVT_NODE_FAILED, EVT_NODE_LEFT);
-
-        ctx.io().addMessageListener(MSG_TOPIC, new 
CoordinatorMessageListener());
-    }
-    
-    /**
-     * @param log Logger.
-     */
-    public void dumpStatistics(IgniteLogger log) {
-        if (STAT_CNTRS) {
-            log.info("Mvcc coordinator statistics: ");
-
-            for (StatCounter cntr : statCntrs)
-                cntr.dumpInfo(log);
-        }
-    }
-
-    /**
-     * @param tx Transaction.
-     * @return Counter.
-     */
-    public MvccCoordinatorVersion 
requestTxCounterOnCoordinator(IgniteInternalTx tx) {
-        assert ctx.localNodeId().equals(currentCoordinatorId());
-
-        return assignTxCounter(tx.nearXidVersion(), 0L);
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param lsnr Response listener.
-     * @param txVer Transaction version.
-     * @return Counter request future.
-     */
-    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestTxCounter(MvccCoordinator crd,
-        MvccResponseListener lsnr,
-        GridCacheVersion txVer) {
-        assert !ctx.localNodeId().equals(crd.nodeId());
-
-        MvccVersionFuture fut = new 
MvccVersionFuture(futIdCntr.incrementAndGet(),
-            crd.nodeId(),
-            lsnr);
-
-        verFuts.put(fut.id, fut);
-
-        try {
-            ctx.io().sendToGridTopic(crd.nodeId(),
-                MSG_TOPIC,
-                new CoordinatorTxCounterRequest(fut.id, txVer),
-                MSG_POLICY);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onError(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param mvccVer Query version.
-     */
-    public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion 
mvccVer) {
-        assert crd != null;
-
-        long trackCntr = mvccVer.counter();
-
-        MvccLongList txs = mvccVer.activeTransactions();
-
-        if (txs != null) {
-            for (int i = 0; i < txs.size(); i++) {
-                long txId = txs.get(i);
-
-                if (txId < trackCntr)
-                    trackCntr = txId;
-            }
-        }
-
-        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() 
? new CoordinatorQueryAckRequest(trackCntr) :
-            new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), 
trackCntr);
-
-        try {
-            ctx.io().sendToGridTopic(crd.nodeId(),
-                MSG_TOPIC,
-                msg,
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send query ack, node left [crd=" + crd + 
", msg=" + msg + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + 
msg + ']', e);
-        }
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @return Counter request future.
-     */
-    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestQueryCounter(MvccCoordinator crd) {
-        assert crd != null;
-
-        // TODO IGNITE-3478: special case for local?
-        MvccVersionFuture fut = new 
MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null);
-
-        verFuts.put(fut.id, fut);
-
-        try {
-            ctx.io().sendToGridTopic(crd.nodeId(),
-                MSG_TOPIC,
-                new CoordinatorQueryVersionRequest(fut.id),
-                MSG_POLICY);
-        }
-        catch (IgniteCheckedException e) {
-            if (verFuts.remove(fut.id) != null)
-                fut.onDone(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crdId Coordinator ID.
-     * @param txs Transaction IDs.
-     * @return Future.
-     */
-    public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList 
txs) {
-        assert crdId != null;
-        assert txs != null && txs.size() > 0;
-
-        // TODO IGNITE-3478: special case for local?
-
-        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crdId, false);
-
-        ackFuts.put(fut.id, fut);
-
-        try {
-            ctx.io().sendToGridTopic(crdId,
-                MSG_TOPIC,
-                new CoordinatorWaitTxsRequest(fut.id, txs),
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(); // No need to ack, finish without error.
-        }
-        catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param mvccVer Transaction version.
-     * @return Acknowledge future.
-     */
-    public IgniteInternalFuture<Void> ackTxCommit(UUID crd, 
MvccCoordinatorVersion mvccVer) {
-        assert crd != null;
-        assert mvccVer != null;
-
-        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crd, true);
-
-        ackFuts.put(fut.id, fut);
-
-        try {
-            ctx.io().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(); // No need to ack, finish without error.
-        }
-        catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crdId Coordinator node ID.
-     * @param mvccVer Transaction version.
-     */
-    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
-        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, 
mvccVer.counter());
-
-        msg.skipResponse(true);
-
-        try {
-            ctx.io().sendToGridTopic(crdId,
-                MSG_TOPIC,
-                msg,
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send tx rollback ack, node left [msg=" + 
msg + ", node=" + crdId + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", 
node=" + crdId + ']', e);
-        }
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorTxCounterRequest(UUID nodeId, 
CoordinatorTxCounterRequest msg) {
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null) {
-            if (log.isDebugEnabled())
-                log.debug("Ignore tx counter request processing, node left 
[msg=" + msg + ", node=" + nodeId + ']');
-
-            return;
-        }
-
-        MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), 
msg.futureId());
-
-        if (STAT_CNTRS)
-            statCntrs[0].update(res.size());
-
-        try {
-            ctx.io().sendToGridTopic(node,
-                MSG_TOPIC,
-                res,
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send tx counter response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send tx counter response [msg=" + msg + ", 
node=" + nodeId + ']', e);
-        }
-    }
-
-    /**
-     *
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorQueryVersionRequest(UUID nodeId, 
CoordinatorQueryVersionRequest msg) {
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null) {
-            if (log.isDebugEnabled())
-                log.debug("Ignore query counter request processing, node left 
[msg=" + msg + ", node=" + nodeId + ']');
-
-            return;
-        }
-
-        MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, 
msg.futureId());
-
-        try {
-            ctx.io().sendToGridTopic(node,
-                MSG_TOPIC,
-                res,
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send query counter response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
-
-            onNodeFailed(nodeId);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send query counter response [msg=" + msg + 
", node=" + nodeId + ']', e);
-
-            onQueryDone(res.counter());
-        }
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorVersionResponse(UUID nodeId, 
MvccCoordinatorVersionResponse msg) {
-        MvccVersionFuture fut = verFuts.remove(msg.futureId());
-
-        if (fut != null) {
-            if (STAT_CNTRS)
-                statCntrs[1].update((System.nanoTime() - fut.startTime) * 
1000);
-
-            fut.onResponse(msg);
-        }
-        else {
-            if (ctx.discovery().alive(nodeId))
-                U.warn(log, "Failed to find query version future [node=" + 
nodeId + ", msg=" + msg + ']');
-            else if (log.isDebugEnabled())
-                log.debug("Failed to find query version future [node=" + 
nodeId + ", msg=" + msg + ']');
-        }
-    }
-
-    /**
-     * @param msg Message.
-     */
-    private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest 
msg) {
-        onQueryDone(msg.counter());
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param msg Message.
-     */
-    private void processNewCoordinatorQueryAckRequest(UUID nodeId, 
NewCoordinatorQueryAckRequest msg) {
-        prevCrdQueries.onQueryDone(nodeId, msg);
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorTxAckRequest(UUID nodeId, 
CoordinatorTxAckRequest msg) {
-        onTxDone(msg.txCounter());
-
-        if (STAT_CNTRS)
-            statCntrs[2].update();
-
-        if (!msg.skipResponse()) {
-            try {
-                ctx.io().sendToGridTopic(nodeId,
-                    MSG_TOPIC,
-                    new CoordinatorFutureResponse(msg.futureId()),
-                    MSG_POLICY);
-            }
-            catch (ClusterTopologyCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send tx ack response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send tx ack response [msg=" + msg + ", 
node=" + nodeId + ']', e);
-            }
-        }
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorAckResponse(UUID nodeId, 
CoordinatorFutureResponse msg) {
-        WaitAckFuture fut = ackFuts.remove(msg.futureId());
-
-        if (fut != null) {
-            if (STAT_CNTRS) {
-                StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6];
-
-                cntr.update((System.nanoTime() - fut.startTime) * 1000);
-            }
-
-            fut.onResponse();
-        }
-        else {
-            if (ctx.discovery().alive(nodeId))
-                U.warn(log, "Failed to find tx ack future [node=" + nodeId + 
", msg=" + msg + ']');
-            else if (log.isDebugEnabled())
-                log.debug("Failed to find tx ack future [node=" + nodeId + ", 
msg=" + msg + ']');
-        }
-    }
-
-    /**
-     * @param txId Transaction ID.
-     * @return Counter.
-     */
-    private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion 
txId, long futId) {
-        assert crdVer != 0;
-
-        long nextCtr = mvccCntr.incrementAndGet();
-
-        // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
-        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
-
-        for (Long txVer : activeTxs.keySet())
-            res.addTx(txVer);
-
-        Object old = activeTxs.put(nextCtr, txId);
-
-        assert old == null : txId;
-
-        long cleanupVer;
-
-        if (prevCrdQueries.previousQueriesDone()) {
-            cleanupVer = committedCntr.get() - 1;
-
-            for (Long qryVer : activeQueries.keySet()) {
-                if (qryVer <= cleanupVer)
-                    cleanupVer = qryVer - 1;
-            }
-        }
-        else
-            cleanupVer = -1;
-
-        res.init(futId, crdVer, nextCtr, cleanupVer);
-
-        return res;
-    }
-
-    /**
-     * @param txCntr Counter assigned to transaction.
-     */
-    private void onTxDone(Long txCntr) {
-        GridFutureAdapter fut; // TODO IGNITE-3478.
-
-        GridCacheVersion ver = activeTxs.remove(txCntr);
-
-        assert ver != null;
-
-        committedCntr.setIfGreater(txCntr);
-
-        fut = waitTxFuts.remove(txCntr);
-
-        if (fut != null)
-            fut.onDone();
-    }
-
-    static boolean increment(AtomicInteger cntr) {
-        for (;;) {
-            int current = cntr.get();
-
-            if (current == 0)
-                return false;
-
-            if (cntr.compareAndSet(current, current + 1))
-                return true;
-        }
-    }
-
-    /**
-     * @param qryNodeId Node initiated query.
-     * @return Counter for query.
-     */
-    private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, 
long futId) {
-        assert crdVer != 0;
-
-        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
-
-        Long mvccCntr;
-
-        for(;;) {
-            mvccCntr = committedCntr.get();
-
-            Long trackCntr = mvccCntr;
-
-            for (Long txVer : activeTxs.keySet()) {
-                if (txVer < trackCntr)
-                    trackCntr = txVer;
-
-                res.addTx(txVer);
-            }
-
-            registerActiveQuery(trackCntr);
-
-            if (committedCntr.get() == mvccCntr)
-                break;
-            else {
-                res.resetTransactionsCount();
-
-                onQueryDone(trackCntr);
-            }
-        }
-
-        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
-
-        return res;
-    }
-
-    private void registerActiveQuery(Long cntr) {
-        for (;;) {
-            AtomicInteger qryCnt = activeQueries.get(cntr);
-
-            if (qryCnt != null) {
-                boolean inc = increment(qryCnt);
-
-                if (!inc) {
-                    activeQueries.remove(mvccCntr, qryCnt);
-
-                    continue;
-                }
-            }
-            else {
-                qryCnt = new AtomicInteger(1);
-
-                if (activeQueries.putIfAbsent(cntr, qryCnt) != null)
-                    continue;
-            }
-
-            break;
-        }
-    }
-
-    private void onNodeFailed(UUID nodeId) {
-        // TODO
-    }
-
-    /**
-     * @param mvccCntr Query counter.
-     */
-    private void onQueryDone(long mvccCntr) {
-        AtomicInteger cntr = activeQueries.get(mvccCntr);
-
-        assert cntr != null : mvccCntr;
-
-        int left = cntr.decrementAndGet();
-
-        assert left >= 0 : left;
-
-        if (left == 0) {
-            boolean rmv = activeQueries.remove(mvccCntr, cntr);
-
-            assert rmv;
-        }
-    }
-
-    /**
-     * @param msg Message.
-     */
-    private void processCoordinatorWaitTxsRequest(final UUID nodeId, final 
CoordinatorWaitTxsRequest msg) {
-        statCntrs[5].update();
-
-        GridLongList txs = msg.transactions();
-
-        GridCompoundFuture resFut = null;
-
-        for (int i = 0; i < txs.size(); i++) {
-            Long txId = txs.get(i);
-
-            WaitTxFuture fut = waitTxFuts.get(txId);
-
-            if (fut == null) {
-                WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new 
WaitTxFuture(txId));
-
-                if (old != null)
-                    fut = old;
-            }
-
-            if (!activeTxs.containsKey(txId))
-                fut.onDone();
-
-            if (!fut.isDone()) {
-                if (resFut == null)
-                    resFut = new GridCompoundFuture();
-
-                resFut.add(fut);
-            }
-        }
-
-        if (resFut != null)
-            resFut.markInitialized();
-
-        if (resFut == null || resFut.isDone())
-            sendFutureResponse(nodeId, msg);
-        else {
-            resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
-                @Override public void apply(IgniteInternalFuture fut) {
-                    sendFutureResponse(nodeId, msg);
-                }
-            });
-        }
-    }
-
-    /**
-     * @param nodeId
-     * @param msg
-     */
-    private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest 
msg) {
-        try {
-            ctx.io().sendToGridTopic(nodeId,
-                MSG_TOPIC,
-                new CoordinatorFutureResponse(msg.futureId()),
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send tx ack response, node left [msg=" + 
msg + ", node=" + nodeId + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send tx ack response [msg=" + msg + ", 
node=" + nodeId + ']', e);
-        }
-    }
-
-    public MvccCoordinator currentCoordinator() {
-        return curCrd;
-    }
-
-    public UUID currentCoordinatorId() {
-        MvccCoordinator curCrd = this.curCrd;
-
-        return curCrd != null ? curCrd.nodeId() : null;
-    }
-
-    /**
-     * @param topVer Cache affinity version (used for assert).
-     * @return Coordinator.
-     */
-    public MvccCoordinator 
currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) {
-        MvccCoordinator crd = curCrd;
-
-        // Assert coordinator did not already change.
-        assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 :
-            "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']';
-
-        return crd;
-    }
-
-    /**
-     * @param discoCache Discovery snapshot.
-     * @return New coordinator.
-     */
-    public MvccCoordinator reassignCoordinator(DiscoCache discoCache) {
-        assert curCrd == null || 
!F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd;
-
-        if (!discoCache.serverNodes().isEmpty()) {
-            ClusterNode node = discoCache.serverNodes().get(0);
-
-            curCrd = new MvccCoordinator(node.id(),
-                discoCache.version().topologyVersion(),
-                discoCache.version());
-
-            log.info("Assigned mvcc coordinator: " + curCrd);
-        }
-        else {
-            curCrd = null;
-
-            log.info("New mvcc coordinator was not assigned [topVer=" + 
discoCache.version() + ']');
-        }
-
-        return curCrd;
-    }
-
-    /**
-     * @param nodeId Node ID
-     * @param activeQueries
-     */
-    public void processClientActiveQueries(UUID nodeId,
-        @Nullable Map<MvccCounter, Integer> activeQueries) {
-        prevCrdQueries.processClientActiveQueries(nodeId, activeQueries);
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @param activeQueries Current queries.
-     */
-    public void initCoordinator(AffinityTopologyVersion topVer,
-        DiscoCache discoCache,
-        Map<UUID, Map<MvccCounter, Integer>> activeQueries)
-    {
-        assert ctx.localNodeId().equals(curCrd.nodeId());
-
-        log.info("Initialize local node as mvcc coordinator [node=" + 
ctx.localNodeId() +
-            ", topVer=" + topVer + ']');
-
-        crdVer = topVer.topologyVersion();
-
-        prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
-
-        crdLatch.countDown();
-    }
-
-    /**
-     *
-     */
-    public class MvccVersionFuture extends 
GridFutureAdapter<MvccCoordinatorVersion> {
-        /** */
-        private final Long id;
-
-        /** */
-        private MvccResponseListener lsnr;
-
-        /** */
-        public final UUID crdId;
-
-        /** */
-        long startTime;
-
-        /**
-         * @param id Future ID.
-         * @param crdId Coordinator node ID.
-         */
-        MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener 
lsnr) {
-            this.id = id;
-            this.crdId = crdId;
-            this.lsnr = lsnr;
-
-            if (STAT_CNTRS)
-                startTime = System.nanoTime();
-        }
-
-        /**
-         * @param res Response.
-         */
-        void onResponse(MvccCoordinatorVersionResponse res) {
-            assert res.counter() != COUNTER_NA;
-
-            if (lsnr != null)
-                lsnr.onMvccResponse(crdId, res);
-
-            onDone(res);
-        }
-
-        void onError(IgniteCheckedException err) {
-            if (verFuts.remove(id) != null) {
-                if (lsnr != null)
-                    lsnr.onMvccError(err);
-
-                onDone(err);
-            }
-        }
-
-        /**
-         * @param nodeId Failed node ID.
-         */
-        void onNodeLeft(UUID nodeId) {
-            if (crdId.equals(nodeId)) {
-                ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException("Failed to request coordinator version, " +
-                    "coordinator failed: " + nodeId);
-
-                onError(err);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']';
-        }
-    }
-
-    /**
-     *
-     */
-    private class WaitAckFuture extends GridFutureAdapter<Void> {
-        /** */
-        private final long id;
-
-        /** */
-        private final UUID crdId;
-
-        /** */
-        long startTime;
-
-        /** */
-        final boolean ackTx;
-
-        /**
-         * @param id Future ID.
-         * @param crdId Coordinator node ID.
-         */
-        WaitAckFuture(long id, UUID crdId, boolean ackTx) {
-            this.id = id;
-            this.crdId = crdId;
-            this.ackTx = ackTx;
-
-            if (STAT_CNTRS)
-                startTime = System.nanoTime();
-        }
-
-        /**
-         *
-         */
-        void onResponse() {
-            onDone();
-        }
-
-        /**
-         * @param nodeId Failed node ID.
-         */
-        void onNodeLeft(UUID nodeId) {
-            if (crdId.equals(nodeId) && verFuts.remove(id) != null)
-                onDone();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "WaitAckFuture [crdId=" + crdId + ", id=" + id + ']';
-        }
-    }
-
-    /**
-     *
-     */
-    private class CacheCoordinatorNodeFailListener implements 
GridLocalEventListener {
-        /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
-            assert evt instanceof DiscoveryEvent : evt;
-
-            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
-            UUID nodeId = discoEvt.eventNode().id();
-
-            for (MvccVersionFuture fut : verFuts.values())
-                fut.onNodeLeft(nodeId);
-
-            for (WaitAckFuture fut : ackFuts.values())
-                fut.onNodeLeft(nodeId);
-
-            prevCrdQueries.onNodeLeft(nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "CacheCoordinatorDiscoveryListener[]";
-        }
-    }
-    /**
-     *
-     */
-    private class CoordinatorMessageListener implements GridMessageListener {
-        /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
-            if (STAT_CNTRS)
-                statCntrs[4].update();
-
-            MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg;
-
-            if (msg0.waitForCoordinatorInit()) {
-                if (crdVer == 0) {
-                    try {
-                        U.await(crdLatch);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        U.warn(log, "Failed to wait for coordinator 
initialization, thread interrupted [" +
-                            "msgNode=" + nodeId + ", msg=" + msg + ']');
-
-                        return;
-                    }
-
-                    assert crdVer != 0L;
-                }
-            }
-
-            if (msg instanceof CoordinatorTxCounterRequest)
-                processCoordinatorTxCounterRequest(nodeId, 
(CoordinatorTxCounterRequest)msg);
-            else if (msg instanceof CoordinatorTxAckRequest)
-                processCoordinatorTxAckRequest(nodeId, 
(CoordinatorTxAckRequest)msg);
-            else if (msg instanceof CoordinatorFutureResponse)
-                processCoordinatorAckResponse(nodeId, 
(CoordinatorFutureResponse)msg);
-            else if (msg instanceof CoordinatorQueryAckRequest)
-                
processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
-            else if (msg instanceof CoordinatorQueryVersionRequest)
-                processCoordinatorQueryVersionRequest(nodeId, 
(CoordinatorQueryVersionRequest)msg);
-            else if (msg instanceof MvccCoordinatorVersionResponse)
-                processCoordinatorVersionResponse(nodeId, 
(MvccCoordinatorVersionResponse) msg);
-            else if (msg instanceof CoordinatorWaitTxsRequest)
-                processCoordinatorWaitTxsRequest(nodeId, 
(CoordinatorWaitTxsRequest)msg);
-            else if (msg instanceof NewCoordinatorQueryAckRequest)
-                processNewCoordinatorQueryAckRequest(nodeId, 
(NewCoordinatorQueryAckRequest)msg);
-            else
-                U.warn(log, "Unexpected message received [node=" + nodeId + ", 
msg=" + msg + ']');
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "CoordinatorMessageListener[]";
-        }
-    }
-    /**
-     *
-     */
-    static class StatCounter {
-        /** */
-        final String name;
-
-        /** */
-        final LongAdder8 cntr = new LongAdder8();
-
-        public StatCounter(String name) {
-            this.name = name;
-        }
-
-        void update() {
-            cntr.increment();
-        }
-
-        void update(GridLongList arg) {
-            throw new UnsupportedOperationException();
-        }
-
-        void update(long arg) {
-            throw new UnsupportedOperationException();
-        }
-
-        void dumpInfo(IgniteLogger log) {
-            long totalCnt = cntr.sumThenReset();
-
-            if (totalCnt > 0)
-                log.info(name + " [cnt=" + totalCnt + ']');
-        }
-    }
-
-    /**
-     *
-     */
-    static class CounterWithAvg extends StatCounter {
-        /** */
-        final LongAdder8 total = new LongAdder8();
-
-        /** */
-        final String avgName;
-
-        CounterWithAvg(String name, String avgName) {
-            super(name);
-
-            this.avgName = avgName;
-        }
-
-        @Override void update(GridLongList arg) {
-            update(arg != null ? arg.size() : 0);
-        }
-
-        @Override void update(long add) {
-            cntr.increment();
-
-            total.add(add);
-        }
-
-        void dumpInfo(IgniteLogger log) {
-            long totalCnt = cntr.sumThenReset();
-            long totalSum = total.sumThenReset();
-
-            if (totalCnt > 0)
-                log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + 
((float)totalSum / totalCnt) + ']');
-        }
-    }
-
-    /**
-     *
-     */
-    private static class WaitTxFuture extends GridFutureAdapter {
-        /** */
-        private final long txId;
-
-        /**
-         * @param txId Transaction ID.
-         */
-        WaitTxFuture(long txId) {
-            this.txId = txId;
-        }
-    }
-}

Reply via email to