Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 331a255cd -> 537a3ecb2


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/537a3ecb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/537a3ecb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/537a3ecb

Branch: refs/heads/ignite-3479
Commit: 537a3ecb2218c8676ac14886b7aa04081b975f7c
Parents: 331a255
Author: sboikov <[email protected]>
Authored: Wed Sep 27 12:21:14 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Sep 27 15:10:40 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  12 ++
 .../affinity/GridAffinityAssignmentCache.java   |   4 +-
 .../cache/CacheAffinitySharedManager.java       |  17 +-
 .../processors/cache/ExchangeContext.java       |  33 ++--
 .../dht/GridPartitionedGetFuture.java           |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        |  18 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 118 ++++++++++---
 .../processors/cache/mvcc/MvccCoordinator.java  |  48 ++++--
 .../processors/cache/mvcc/MvccCounter.java      |  73 +++++++-
 .../mvcc/NewCoordinatorQueryAckRequest.java     | 156 +++++++++++++++++
 .../cache/mvcc/PreviousCoordinatorQueries.java  | 170 +++++++++++++++++++
 .../query/GridCacheDistributedQueryManager.java |   7 +-
 .../cache/query/GridCacheQueryManager.java      |  13 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  65 +++++--
 14 files changed, 639 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 22d2779..99bc8af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -110,6 +110,8 @@ import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureRespons
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
+import 
org.apache.ignite.internal.processors.cache.mvcc.NewCoordinatorQueryAckRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
@@ -929,6 +931,16 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 return msg;
 
+            case 140:
+                msg = new NewCoordinatorQueryAckRequest();
+
+                return msg;
+
+            case 141:
+                msg = new MvccCounter();
+
+                return msg;
+
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 83837b8..fb4092a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -195,10 +195,12 @@ public class GridAffinityAssignmentCache {
      *
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment for topology version.
+     * @param mvccCrd Mvcc coordinator.
      */
-    private void initialize(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) {
+    public void initialize(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) {
         assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + 
", last=" + lastVersion() + ']';
         assert idealAssignment != null;
+        assert mvccCrd == null || topVer.compareTo(mvccCrd.topologyVersion()) 
>= 0 : "[mvccCrd=" + mvccCrd + ", topVer=" + topVer + ']';
 
         GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, 
affAssignment, idealAssignment, mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 99727e6..1f9890c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -57,6 +57,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridLongList;
@@ -499,6 +500,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 assert grp != null;
 
                 GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer,
+                    cctx.coordinators().currentCoordinator(),
                     null,
                     discoCache,
                     grp.affinity(),
@@ -1188,6 +1190,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             fetchFut.init(false);
 
             fetchAffinity(evts.topologyVersion(),
+                cctx.coordinators().currentCoordinator(),
                 evts.lastEvent(),
                 evts.discoveryCache(),
                 aff, fetchFut);
@@ -1536,6 +1539,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             int grpId = fetchFut.groupId();
 
             fetchAffinity(topVer,
+                cctx.coordinators().currentCoordinator(),
                 fut.events().lastEvent(),
                 fut.events().discoveryCache(),
                 cctx.cache().cacheGroup(grpId).affinity(),
@@ -1545,6 +1549,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
     /**
      * @param topVer Topology version.
+     * @param mvccCrd Mvcc coordinator to set in affinity.
      * @param discoveryEvt Discovery event.
      * @param discoCache Discovery data cache.
      * @param affCache Affinity.
@@ -1552,7 +1557,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      * @return Affinity assignment response.
      */
-    private GridDhtAffinityAssignmentResponse 
fetchAffinity(AffinityTopologyVersion topVer,
+    private GridDhtAffinityAssignmentResponse fetchAffinity(
+        AffinityTopologyVersion topVer,
+        MvccCoordinator mvccCrd,
         @Nullable DiscoveryEvent discoveryEvt,
         DiscoCache discoCache,
         GridAffinityAssignmentCache affCache,
@@ -1565,7 +1572,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         if (res == null) {
             List<List<ClusterNode>> aff = affCache.calculate(topVer, 
discoveryEvt, discoCache);
 
-            affCache.initialize(topVer, aff);
+            affCache.initialize(topVer, aff, mvccCrd);
         }
         else {
             List<List<ClusterNode>> idealAff = 
res.idealAffinityAssignment(discoCache);
@@ -1582,7 +1589,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
             assert aff != null : res;
 
-            affCache.initialize(topVer, aff);
+            affCache.initialize(topVer, aff, mvccCrd);
         }
 
         return res;
@@ -1632,7 +1639,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      * @return Future completed when caches initialization is done.
      */
-    public IgniteInternalFuture<?> initCoordinatorCaches(final 
GridDhtPartitionsExchangeFuture fut,
+    public IgniteInternalFuture<?> initCoordinatorCaches(
+        final GridDhtPartitionsExchangeFuture fut,
         final boolean newAff) throws IgniteCheckedException {
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new 
ArrayList<>();
 
@@ -1700,6 +1708,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                             @Override public void 
applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
                                 throws IgniteCheckedException {
                                 fetchAffinity(prev.topologyVersion(),
+                                    null, // Pass null mvcc coordinator, this 
affinity version should be used for queries.
                                     prev.events().lastEvent(),
                                     prev.events().discoveryCache(),
                                     aff,

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 36ce6ef..67bf9ce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -56,8 +58,8 @@ public class ExchangeContext {
     /** */
     private final boolean newMvccCrd;
 
-    /** */
-    private Map<MvccCounter, Integer> activeQrys;
+    /** Currently running mvcc queries, initialized when mvcc coordinator is 
changed. */
+    private Map<UUID, Map<MvccCounter, Integer>> activeQueries;
 
     /**
      * @param crd Coordinator flag.
@@ -71,7 +73,7 @@ public class ExchangeContext {
         if (compatibilityNode || (crd && fut.localJoinExchange())) {
             fetchAffOnJoin = true;
 
-            merge = !newMvccCrd;
+            merge = false;
         }
         else {
             boolean startCaches = fut.exchangeId().isJoined() &&
@@ -79,8 +81,7 @@ public class ExchangeContext {
 
             fetchAffOnJoin = protocolVer == 1;
 
-            merge = !newMvccCrd &&
-                !startCaches &&
+            merge = !startCaches &&
                 protocolVer > 1 &&
                 fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
         }
@@ -142,26 +143,18 @@ public class ExchangeContext {
         return newMvccCrd;
     }
 
-    public Map<MvccCounter, Integer> activeQueries() {
-        return activeQrys;
+    public Map<UUID, Map<MvccCounter, Integer>> activeQueries() {
+        return activeQueries;
     }
 
-    public void addActiveQueries(Map<MvccCounter, Integer> activeQrys0) {
-        if (activeQrys0 == null)
+    public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, 
Integer> activeQueries0) {
+        if (activeQueries0 == null)
             return;
 
-        if (activeQrys != null) {
-            for (Map.Entry<MvccCounter, Integer> e : activeQrys0.entrySet()) {
-                Integer cnt = activeQrys.get(e.getKey());
+        if (activeQueries == null)
+            activeQueries = new HashMap<>();
 
-                if (cnt == null)
-                    activeQrys.put(e.getKey(), e.getValue());
-                else
-                    activeQrys.put(e.getKey(), cnt + e.getValue());
-            }
-        }
-        else
-            activeQrys = activeQrys0;
+        activeQueries.put(nodeId, activeQueries0);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 1476b2a..37e9feb6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -335,7 +335,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
                 }
 
                 if (mvccVer0 != null)
-                    cctx.shared().coordinators().ackQueryDone(mvccCrd0.node(), 
mvccVer0);
+                    cctx.shared().coordinators().ackQueryDone(mvccCrd0, 
mvccVer0);
             }
 
             cache().sendTtlUpdateRequest(expiryPlc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1642263..51da7a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -841,7 +841,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
-            exchCtx.addActiveQueries(activeQrys);
+            exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys);
         }
     }
 
@@ -1304,7 +1304,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 msg.partitionHistoryCounters(partHistReserved0);
         }
 
-        msg.activeQueries(exchCtx.activeQueries());
+        Map<UUID, Map<MvccCounter, Integer>> activeQueries = 
exchCtx.activeQueries();
+
+        msg.activeQueries(activeQueries != null ? 
activeQueries.get(cctx.localNodeId()) : null);
 
         if (stateChangeExchange() && changeGlobalStateE != null)
             msg.setError(changeGlobalStateE);
@@ -1482,7 +1484,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         if (err == null) {
             if (exchCtx.newMvccCoordinator() && 
cctx.localNode().equals(cctx.coordinators().currentCoordinatorNode()))
-                cctx.coordinators().initCoordinator(res, 
exchCtx.activeQueries());
+                cctx.coordinators().initCoordinator(res, 
exchCtx.events().discoveryCache(), exchCtx.activeQueries());
 
             if (centralizedAff) {
                 assert !exchCtx.mergeExchanges();
@@ -1904,6 +1906,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private void processSingleMessage(UUID nodeId, 
GridDhtPartitionsSingleMessage msg) {
         if (msg.client()) {
+            if (msg.activeQueries() != null)
+                cctx.coordinators().processClientActiveQueries(nodeId, 
msg.activeQueries());
+
             waitAndReplyToNode(nodeId, msg);
 
             return;
@@ -2252,7 +2257,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (exchCtx.mergeExchanges()) {
+            if (exchCtx.mergeExchanges() && !exchCtx.newMvccCoordinator()) {
                 if (log.isInfoEnabled())
                     log.info("Coordinator received all messages, try merge 
[ver=" + initialVersion() + ']');
 
@@ -2324,7 +2329,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
msgs.entrySet()) {
                 GridDhtPartitionsSingleMessage msg = e.getValue();
 
-                exchCtx.addActiveQueries(msg.activeQueries());
+                if (exchCtx.newMvccCoordinator())
+                    exchCtx.addActiveQueries(e.getKey(), msg.activeQueries());
+                else
+                    assert msg.activeQueries() == null;
 
                 // Apply update counters after all single messages are 
received.
                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg.partitions().entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 2bf653c..f144437 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
 
@@ -87,6 +88,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private final ConcurrentMap<Long, AtomicInteger> activeQueries = new 
ConcurrentHashMap<>();
 
     /** */
+    private final PreviousCoordinatorQueries prevCrdQueries = new 
PreviousCoordinatorQueries();
+
+    /** */
     private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new 
ConcurrentHashMap<>();
 
     /** */
@@ -138,7 +142,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
         statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", 
"avgFutTime");
 
-        cctx.gridEvents().addLocalEventListener(new 
CacheCoordinatorDiscoveryListener(),
+        cctx.gridEvents().addLocalEventListener(new 
CacheCoordinatorNodeFailListener(),
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
         cctx.gridIO().addMessageListener(MSG_TOPIC, new 
CoordinatorMessageListener());
@@ -169,9 +173,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     /**
      * @param crd Coordinator.
      * @param lsnr Response listener.
+     * @param txVer Transaction version.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestTxCounter(ClusterNode crd, MvccResponseListener lsnr, GridCacheVersion 
txVer) {
+    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestTxCounter(ClusterNode crd,
+        MvccResponseListener lsnr,
+        GridCacheVersion txVer) {
         assert !crd.isLocal() : crd;
 
         MvccVersionFuture fut = new 
MvccVersionFuture(futIdCntr.incrementAndGet(),
@@ -197,32 +204,37 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
      * @param crd Coordinator.
      * @param mvccVer Query version.
      */
-    public void ackQueryDone(ClusterNode crd, MvccCoordinatorVersion mvccVer) {
-        try {
-            long trackCntr = mvccVer.counter();
+    public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion 
mvccVer) {
+        assert crd != null;
 
-            MvccLongList txs = mvccVer.activeTransactions();
+        long trackCntr = mvccVer.counter();
 
-            if (txs != null) {
-                for (int i = 0; i < txs.size(); i++) {
-                    long txId = txs.get(i);
+        MvccLongList txs = mvccVer.activeTransactions();
 
-                    if (txId < trackCntr)
-                        trackCntr = txId;
-                }
+        if (txs != null) {
+            for (int i = 0; i < txs.size(); i++) {
+                long txId = txs.get(i);
+
+                if (txId < trackCntr)
+                    trackCntr = txId;
             }
+        }
 
-            cctx.gridIO().sendToGridTopic(crd,
+        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() 
? new CoordinatorQueryAckRequest(trackCntr) :
+            new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), 
trackCntr);
+
+        try {
+            cctx.gridIO().sendToGridTopic(crd.node(),
                 MSG_TOPIC,
-                new CoordinatorQueryAckRequest(trackCntr),
+                msg,
                 MSG_POLICY);
         }
         catch (ClusterTopologyCheckedException e) {
             if (log.isDebugEnabled())
-                log.debug("Failed to send query ack, node left [crd=" + crd + 
']');
+                log.debug("Failed to send query ack, node left [crd=" + crd + 
", msg=" + msg + ']');
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + 
mvccVer + ']', e);
+            U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + 
msg + ']', e);
         }
     }
 
@@ -401,7 +413,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             if (log.isDebugEnabled())
                 log.debug("Failed to send query counter response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
 
-            onQueryDone(res.counter());
+            onNodeFailed(nodeId);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send query counter response [msg=" + msg + 
", node=" + nodeId + ']', e);
@@ -439,6 +451,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void processNewCoordinatorQueryAckRequest(UUID nodeId, 
NewCoordinatorQueryAckRequest msg) {
+        prevCrdQueries.onQueryDone(nodeId, msg);
+    }
+
+    /**
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
@@ -508,12 +528,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         assert old == null : txId;
 
-        long cleanupVer = committedCntr.get() - 1;
+        long cleanupVer;
+
+        if (prevCrdQueries.previousQueriesDone()) {
+            cleanupVer = committedCntr.get() - 1;
 
-        for (Long qryVer : activeQueries.keySet()) {
-            if (qryVer <= cleanupVer)
-                cleanupVer = qryVer - 1;
+            for (Long qryVer : activeQueries.keySet()) {
+                if (qryVer <= cleanupVer)
+                    cleanupVer = qryVer - 1;
+            }
         }
+        else
+            cleanupVer = -1;
 
         res.init(futId, crdVer, nextCtr, cleanupVer);
 
@@ -613,6 +639,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         }
     }
 
+    private void onNodeFailed(UUID nodeId) {
+        // TODO
+    }
+
     /**
      * @param mvccCntr Query counter.
      */
@@ -709,34 +739,60 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         return curCrd != null ? curCrd.node() : null;
     }
 
+    /**
+     * @param topVer Cache affinity version (used for assert).
+     * @return Coordinator.
+     */
     public MvccCoordinator 
currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) {
         MvccCoordinator crd = curCrd;
 
         // Assert coordinator did not already change.
-        assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 : 
crd;
+        assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 :
+            "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']';
 
         return crd;
     }
 
     /**
      * @param discoCache Discovery snapshot.
+     * @return New coordinator.
      */
     public MvccCoordinator reassignCoordinator(DiscoCache discoCache) {
         assert curCrd == null || 
!discoCache.allNodes().contains(curCrd.node()) : curCrd;
 
         if (!discoCache.serverNodes().isEmpty()) {
-            curCrd = new MvccCoordinator(discoCache.serverNodes().get(0), 
discoCache.version());
+            curCrd = new MvccCoordinator(discoCache.serverNodes().get(0),
+                discoCache.version().topologyVersion(),
+                discoCache.version());
 
-            log.info("Assigned mvcc coordinator [topVer=" + 
discoCache.version() +
-                ", crd=" + curCrd.node().id() + ']');
+            log.info("Assigned mvcc coordinator: " + curCrd);
         }
-        else
+        else {
             curCrd = null;
 
+            log.info("New mvcc coordinator was not assigned [topVer=" + 
discoCache.version() + ']');
+        }
+
         return curCrd;
     }
 
-    public void initCoordinator(AffinityTopologyVersion topVer, @Nullable 
Map<MvccCounter, Integer> activeQrys) {
+    /**
+     * @param nodeId Node ID.
+     * @param activeQueries Active queries.
+     */
+    public void processClientActiveQueries(UUID nodeId,
+        @Nullable Map<MvccCounter, Integer> activeQueries) {
+        prevCrdQueries.processClientActiveQueries(nodeId, activeQueries);
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param activeQueries Current queries.
+     */
+    public void initCoordinator(AffinityTopologyVersion topVer,
+        DiscoCache discoCache,
+        Map<UUID, Map<MvccCounter, Integer>> activeQueries)
+    {
         assert cctx.localNode().equals(curCrd.node());
 
         log.info("Initialize local node as mvcc coordinator [node=" + 
cctx.localNodeId() +
@@ -744,6 +800,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         crdVer = topVer.topologyVersion();
 
+        prevCrdQueries.init(activeQueries, discoCache, cctx.discovery());
+
         crdLatch.countDown();
     }
 
@@ -868,7 +926,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     /**
      *
      */
-    private class CacheCoordinatorDiscoveryListener implements 
GridLocalEventListener {
+    private class CacheCoordinatorNodeFailListener implements 
GridLocalEventListener {
         /** {@inheritDoc} */
         @Override public void onEvent(Event evt) {
             assert evt instanceof DiscoveryEvent : evt;
@@ -882,6 +940,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
             for (WaitAckFuture fut : ackFuts.values())
                 fut.onNodeLeft(nodeId);
+
+            prevCrdQueries.onNodeLeft(nodeId);
         }
 
         /** {@inheritDoc} */
@@ -930,6 +990,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
                 processCoordinatorVersionResponse(nodeId, 
(MvccCoordinatorVersionResponse) msg);
             else if (msg instanceof CoordinatorWaitTxsRequest)
                 processCoordinatorWaitTxsRequest(nodeId, 
(CoordinatorWaitTxsRequest)msg);
+            else if (msg instanceof NewCoordinatorQueryAckRequest)
+                processNewCoordinatorQueryAckRequest(nodeId, 
(NewCoordinatorQueryAckRequest)msg);
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", 
msg=" + msg + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
index 2affc5a..24ff354 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
@@ -27,39 +27,67 @@ public class MvccCoordinator {
     /** */
     private final ClusterNode crd;
 
+    /**
+     * Unique coordinator version, increases when new coordinator is assigned,
+     * can differ from topVer if we decide to assign coordinator manually.
+     */
+    private final long crdVer;
+
     /** */
     private final AffinityTopologyVersion topVer;
 
-    public MvccCoordinator(ClusterNode crd, final AffinityTopologyVersion 
topVer) {
+    /**
+     * @param crd Coordinator nde.
+     * @param crdVer Coordinator version.
+     * @param topVer Topology version when coordinator was assigned.
+     */
+    public MvccCoordinator(ClusterNode crd, long crdVer, 
AffinityTopologyVersion topVer) {
         this.crd = crd;
+        this.crdVer = crdVer;
         this.topVer = topVer;
     }
 
+    /**
+     * @return Unique coordinator version.
+     */
+    public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /**
+     * @return Coordinator node.
+     */
     public ClusterNode node() {
         return crd;
     }
 
+    /**
+     * @return Topology version when coordinator was assigned.
+     */
     public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
-    @Override public boolean equals(Object other) {
-        if (this == other)
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
             return true;
 
-        if (other == null || getClass() != other.getClass())
+        if (o == null || getClass() != o.getClass())
             return false;
 
-        MvccCoordinator that = (MvccCoordinator)other;
+        MvccCoordinator that = (MvccCoordinator)o;
 
-        return topVer.equals(topVer) && crd.equals(that.crd);
+        return crdVer == that.crdVer;
     }
 
+    /** {@inheritDoc} */
     @Override public int hashCode() {
-        int res = crd.hashCode();
-
-        res = 31 * res + topVer.hashCode();
+        return (int)(crdVer ^ (crdVer >>> 32));
+    }
 
-        return res;
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "MvccCoordinator [node=" + crd.id() + ", ver=" + crdVer + ", 
topVer=" + topVer + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
index 847822e..bec3301 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -32,19 +33,32 @@ public class MvccCounter implements Message {
     /** */
     private long cntr;
 
+    /**
+     *
+     */
     public MvccCounter() {
         // No-po.
     }
 
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
+     */
     public MvccCounter(long crdVer, long cntr) {
         this.crdVer = crdVer;
         this.cntr = cntr;
     }
 
+    /**
+     * @return Coordinator version.
+     */
     public long coordinatorVersion() {
         return crdVer;
     }
 
+    /**
+     * @return Counter.
+     */
     public long counter() {
         return cntr;
     }
@@ -71,22 +85,70 @@ public class MvccCounter implements Message {
 
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        return false;
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
     }
 
     /** {@inheritDoc} */
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        return false;
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccCounter.class);
     }
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return 0;
+        return 141;
     }
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 0;
+        return 2;
     }
 
     /** {@inheritDoc} */
@@ -94,7 +156,8 @@ public class MvccCounter implements Message {
         // No-op.
     }
 
+    /** {@inheritDoc} */
     @Override public String toString() {
-        return super.toString();
+        return S.toString(MvccCounter.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
new file mode 100644
index 0000000..40b8e01
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class NewCoordinatorQueryAckRequest implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public NewCoordinatorQueryAckRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Query counter.
+     */
+    NewCoordinatorQueryAckRequest(long crdVer, long cntr) {
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Coordinator version.
+     */
+    public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(NewCoordinatorQueryAckRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 140;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(NewCoordinatorQueryAckRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
new file mode 100644
index 0000000..dfe584e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class PreviousCoordinatorQueries {
+    /** */
+    private volatile boolean prevQueriesDone;
+
+    /** */
+    private final ConcurrentHashMap<UUID, Map<MvccCounter, Integer>> 
activeQueries = new ConcurrentHashMap<>();
+
+    /** */
+    private Set<UUID> rcvd;
+
+    /** */
+    private Set<UUID> waitNodes;
+
+    /** */
+    private boolean initDone;
+
+    void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache 
discoCache, GridDiscoveryManager mgr) {
+        synchronized (this) {
+            assert !initDone;
+            assert waitNodes == null;
+
+            waitNodes = new HashSet<>();
+
+            for (ClusterNode node : discoCache.allNodes()) {
+                if (CU.clientNode(node) && mgr.alive(node) && 
!F.contains(rcvd, node.id()))
+                    waitNodes.add(node.id());
+            }
+
+            initDone = waitNodes.isEmpty();
+
+            if (srvNodesQueries != null) {
+                for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : 
srvNodesQueries.entrySet())
+                    addAwaitedActiveQueries(e.getKey(), e.getValue());
+            }
+
+            if (initDone)
+                prevQueriesDone = activeQueries.isEmpty();
+        }
+    }
+
+    boolean previousQueriesDone() {
+        return prevQueriesDone;
+    }
+
+    private void addAwaitedActiveQueries(UUID nodeId, Map<MvccCounter, 
Integer> nodeQueries) {
+        if (nodeQueries == null || prevQueriesDone)
+            return;
+
+        Map<MvccCounter, Integer> queries = activeQueries.get(nodeId);
+
+        if (queries == null)
+            activeQueries.put(nodeId, nodeQueries);
+        else {
+            for (Map.Entry<MvccCounter, Integer> e : nodeQueries.entrySet()) {
+                Integer qryCnt = queries.get(e.getKey());
+
+                int newQryCnt = (qryCnt == null ? 0 : qryCnt) + e.getValue();
+
+                if (newQryCnt == 0) {
+                    queries.remove(e.getKey());
+
+                    if (queries.isEmpty())
+                        activeQueries.remove(nodeId);
+                }
+                else
+                    queries.put(e.getKey(), newQryCnt);
+            }
+        }
+
+        prevQueriesDone = activeQueries.isEmpty();
+    }
+
+    void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, 
Integer> activeQueries) {
+        synchronized (this) {
+            if (initDone)
+                return;
+
+            if (waitNodes == null) {
+                if (rcvd == null)
+                    rcvd = new HashSet<>();
+
+                rcvd.add(nodeId);
+            }
+            else
+                initDone = waitNodes.remove(nodeId);
+
+            addAwaitedActiveQueries(nodeId, activeQueries);
+        }
+    }
+
+    /**
+     * @param nodeId Failed node ID.
+     */
+    void onNodeLeft(UUID nodeId) {
+        synchronized (this) {
+            initDone = waitNodes != null && waitNodes.remove(nodeId);
+
+            if (initDone && !prevQueriesDone && activeQueries.remove(nodeId) 
!= null)
+                prevQueriesDone = activeQueries.isEmpty();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+        synchronized (this) {
+            MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), 
msg.counter());
+
+            Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
+
+            if (nodeQueries == null)
+                activeQueries.put(nodeId, nodeQueries = new HashMap<>());
+
+            Integer qryCnt = nodeQueries.get(cntr);
+
+            int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
+
+            if (initDone) {
+                if (newQryCnt == 0) {
+                    nodeQueries.remove(cntr);
+
+                    if (nodeQueries.isEmpty())
+                        activeQueries.remove(nodeId);
+
+                    prevQueriesDone = activeQueries.isEmpty();
+                }
+            }
+            else
+                nodeQueries.put(cntr, newQryCnt);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index f5df18b..83e846f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
@@ -535,13 +536,13 @@ public class GridCacheDistributedQueryManager<K, V> 
extends GridCacheQueryManage
             String clsName = qry.query().queryClassName();
 
             // TODO IGNITE-3478.
-            final ClusterNode mvccCrd;
+            final MvccCoordinator mvccCrd;
             final MvccCoordinatorVersion mvccVer;
 
             if (cctx.mvccEnabled()) {
-                mvccCrd = 
cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()).node();
+                mvccCrd = 
cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
-                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = 
cctx.shared().coordinators().requestQueryCounter(mvccCrd);
+                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = 
cctx.shared().coordinators().requestQueryCounter(mvccCrd.node());
 
                 mvccVer = fut0.get();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 67bd6e0..dda1e69 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -76,6 +76,7 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -825,7 +826,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @throws IgniteCheckedException If failed to get iterator.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> 
qry, boolean locNode, ClusterNode mvccCrd)
+    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> 
qry, boolean locNode, MvccCoordinator mvccCrd)
         throws IgniteCheckedException {
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
 
@@ -1461,13 +1462,13 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                     taskName));
             }
 
-            final ClusterNode mvccCrd;
+            final MvccCoordinator mvccCrd;
 
             // TODO IGNITE-3478.
             if (cctx.mvccEnabled()) {
-                mvccCrd = 
cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()).node();
+                mvccCrd = 
cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
-                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = 
cctx.shared().coordinators().requestQueryCounter(mvccCrd);
+                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = 
cctx.shared().coordinators().requestQueryCounter(mvccCrd.node());
 
                 qry.mvccVersion(fut0.get());
             }
@@ -2915,7 +2916,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         private IgniteCacheExpiryPolicy expiryPlc;
 
         /** */
-        private ClusterNode mvccCrd;
+        private MvccCoordinator mvccCrd;
 
         /** */
         private MvccCoordinatorVersion mvccVer;
@@ -2938,7 +2939,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             IgniteBiPredicate<K, V> scanFilter,
             boolean locNode,
             GridCacheContext cctx,
-            ClusterNode mvccCrd,
+            MvccCoordinator mvccCrd,
             IgniteLogger log) {
             assert mvccCrd == null || qry.mvccVersion() != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 1b8a509..5c11a4b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1671,20 +1671,35 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * TODO IGNITE-3478.
-     *
      * @throws Exception If failed.
      */
-    public void testReadInProgressCoordinatorFails() throws Exception {
+    public void testReadInProgressCoordinatorFails_FromServer() throws 
Exception {
+        readInProgressCoordinatorFails(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFails_FromClient() throws 
Exception {
+        readInProgressCoordinatorFails(true);
+    }
+
+    /**
+     * @param fromClient {@code True} if read from client node, otherwise from 
server node.
+     * @throws Exception If failed.
+     */
+    private void readInProgressCoordinatorFails(boolean fromClient) throws 
Exception {
         testSpi = true;
 
         startGrids(4);
 
         client = true;
 
-        final Ignite client = startGrid(4);
+        assertTrue(startGrid(4).configuration().isClientMode());
+
+        final Ignite getNode = fromClient ? ignite(4) : ignite(1);
 
-        final IgniteCache cache = 
client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
+        final IgniteCache cache = 
getNode.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
             setNodeFilter(new 
TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), 
getTestIgniteInstanceName(1))));
 
         final Set<Integer> keys = new HashSet<>();
@@ -1699,15 +1714,15 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         for (Integer key : keys)
             vals.put(key, -1);
 
-        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+        try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
             cache.putAll(vals);
 
             tx.commit();
         }
 
-        final TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(client);
+        final TestRecordingCommunicationSpi getNodeSpi = 
TestRecordingCommunicationSpi.spi(getNode);
 
-        clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+        getNodeSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() 
{
             @Override public boolean apply(ClusterNode node, Message msg) {
                 return msg instanceof GridNearGetRequest;
             }
@@ -1734,17 +1749,17 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             }
         }, "get-thread");
 
-        clientSpi.waitForBlocked();
+        getNodeSpi.waitForBlocked();
 
         final IgniteInternalFuture releaseWaitFut = GridTestUtils.runAsync(new 
Callable() {
             @Override public Object call() throws Exception {
                 Thread.sleep(3000);
 
-                clientSpi.stopBlock(true);
+                getNodeSpi.stopBlock(true);
 
                 return null;
             }
-        }, "get-thread");
+        }, "stop-block");
 
         stopGrid(0);
 
@@ -1754,7 +1769,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             for (Integer key : keys)
                 vals.put(key, i);
 
-            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
                 cache.putAll(vals);
 
                 tx.commit();
@@ -1763,6 +1778,9 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         releaseWaitFut.get();
         getFut.get();
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
     }
 
     /**
@@ -2197,15 +2215,34 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     private void checkActiveQueriesCleanup(Ignite node) throws Exception {
         final CacheCoordinatorsSharedManager crd = 
((IgniteKernal)node).context().cache().context().coordinators();
 
-        assertTrue(GridTestUtils.waitForCondition(
+        assertTrue("Active queries not empty", GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
                     Map activeQrys = GridTestUtils.getFieldValue(crd, 
"activeQueries");
 
                     return activeQrys.isEmpty();
                 }
-            }, 5000)
+            }, 5_000)
+        );
+        assertTrue("Previous coordinator queries not empty", 
GridTestUtils.waitForCondition(
+            new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    Map prevCrdQueries = GridTestUtils.getFieldValue(crd, 
"prevCrdQueries", "activeQueries");
+
+                    return prevCrdQueries.isEmpty();
+                }
+            }, 5_000)
         );
+
+        if (crd.currentCoordinatorNode().equals(node.cluster().localNode())) {
+            assertTrue("prevQueriesDone flag is not set", 
GridTestUtils.waitForCondition(
+                new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return GridTestUtils.getFieldValue(crd, 
"prevCrdQueries", "prevQueriesDone");
+                    }
+                }, 5_000)
+            );
+        }
     }
 
     /**

Reply via email to