Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 c964314a8 -> 7607029cc


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7607029c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7607029c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7607029c

Branch: refs/heads/ignite-3479
Commit: 7607029cceca82b685460efad98a22d7c947435b
Parents: c964314
Author: sboikov <[email protected]>
Authored: Thu Sep 28 12:06:56 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Sep 28 13:54:59 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/ExchangeContext.java       |  14 +-
 .../GridDhtPartitionsExchangeFuture.java        |   5 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 315 ++++++++++++-------
 .../cache/mvcc/PreviousCoordinatorQueries.java  |  44 ++-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  28 +-
 5 files changed, 268 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 67bf9ce..55ffdaf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -63,6 +63,7 @@ public class ExchangeContext {
 
     /**
      * @param crd Coordinator flag.
+     * @param newMvccCrd {@code True} if new coordinator assigned during this 
exchange.
      * @param fut Exchange future.
      */
     public ExchangeContext(boolean crd, boolean newMvccCrd, 
GridDhtPartitionsExchangeFuture fut) {
@@ -143,18 +144,25 @@ public class ExchangeContext {
         return newMvccCrd;
     }
 
+    /**
+     * @return Active queries.
+     */
     public Map<UUID, Map<MvccCounter, Integer>> activeQueries() {
         return activeQueries;
     }
 
-    public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, 
Integer> activeQueries0) {
-        if (activeQueries0 == null)
+    /**
+     * @param nodeId Node ID.
+     * @param nodeQueries Node queries.
+     */
+    public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, 
Integer> nodeQueries) {
+        if (nodeQueries == null)
             return;
 
         if (activeQueries == null)
             activeQueries = new HashMap<>();
 
-        activeQueries.put(nodeId, activeQueries0);
+        activeQueries.put(nodeId, nodeQueries);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 01ec408..830d50b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -806,7 +806,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         if (exchCtx.newMvccCoordinator()) {
             assert mvccCrd != null;
 
-            Map<MvccCounter, Integer> activeQrys = null;
+            Map<MvccCounter, Integer> activeQrys = new HashMap<>();
 
             for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) {
                 if (fut instanceof MvccQueryAware) {
@@ -815,9 +815,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     if (ver != null ) {
                         MvccCounter cntr = new 
MvccCounter(ver.coordinatorVersion(), ver.counter());
 
-                        if (activeQrys == null)
-                            activeQrys = new HashMap<>();
-
                         Integer cnt = activeQrys.get(cntr);
 
                         if (cnt == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index e2d2183..b50b0a4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -31,7 +33,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -42,7 +43,6 @@ import 
org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridAtomicLong;
@@ -92,7 +92,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = 
new ConcurrentSkipListMap<>();
 
     /** */
-    private final ConcurrentMap<Long, AtomicInteger> activeQueries = new 
ConcurrentHashMap<>();
+    private final ActiveQueries activeQueries = new ActiveQueries();
 
     /** */
     private final PreviousCoordinatorQueries prevCrdQueries = new 
PreviousCoordinatorQueries();
@@ -118,23 +118,6 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     /** */
     private StatCounter[] statCntrs;
 
-    /**
-     * @param ver1 First version.
-     * @param ver2 Second version.
-     * @return Comparison result.
-     */
-    public static int compareVersions(MvccCoordinatorVersion ver1, 
MvccCoordinatorVersion ver2) {
-        assert ver1 != null;
-        assert ver2 != null;
-
-        int cmp = Long.compare(ver1.coordinatorVersion(), 
ver2.coordinatorVersion());
-
-        if (cmp != 0)
-            return cmp;
-
-        return Long.compare(ver1.counter(), ver2.counter());
-    }
-
     /** */
     private CacheCoordinatorsDiscoveryData discoData = new 
CacheCoordinatorsDiscoveryData(null);
 
@@ -146,6 +129,24 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        statCntrs = new StatCounter[7];
+
+        statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", 
"avgTxs");
+        statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", 
"avgFutTime");
+        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+        statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", 
"avgFutTime");
+        statCntrs[4] = new StatCounter("TotalRequests");
+        statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
+        statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", 
"avgFutTime");
+
+        ctx.event().addLocalEventListener(new 
CacheCoordinatorNodeFailListener(),
+            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        ctx.io().addMessageListener(MSG_TOPIC, new 
CoordinatorMessageListener());
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoveryDataExchangeType discoveryDataType() {
         return DiscoveryDataExchangeType.CACHE_CRD_PROC;
     }
@@ -172,6 +173,11 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
         return discoData;
     }
 
+    /**
+     * @param evtType Event type.
+     * @param nodes Current nodes.
+     * @param topVer Topology version.
+     */
     public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, 
long topVer) {
         MvccCoordinator crd = discoData.coordinator();
 
@@ -200,24 +206,6 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        statCntrs = new StatCounter[7];
-
-        statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", 
"avgTxs");
-        statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", 
"avgFutTime");
-        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
-        statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", 
"avgFutTime");
-        statCntrs[4] = new StatCounter("TotalRequests");
-        statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
-        statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", 
"avgFutTime");
-
-        ctx.event().addLocalEventListener(new 
CacheCoordinatorNodeFailListener(),
-            EVT_NODE_FAILED, EVT_NODE_LEFT);
-
-        ctx.io().addMessageListener(MSG_TOPIC, new 
CoordinatorMessageListener());
-    }
-    
     /**
      * @param log Logger.
      */
@@ -344,7 +332,6 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
         assert txs != null && txs.size() > 0;
 
         // TODO IGNITE-3478: special case for local?
-
         WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), 
crdId, false);
 
         ackFuts.put(fut.id, fut);
@@ -482,13 +469,11 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
         catch (ClusterTopologyCheckedException e) {
             if (log.isDebugEnabled())
                 log.debug("Failed to send query counter response, node left 
[msg=" + msg + ", node=" + nodeId + ']');
-
-            onNodeFailed(nodeId);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send query counter response [msg=" + msg + 
", node=" + nodeId + ']', e);
 
-            onQueryDone(res.counter());
+            onQueryDone(nodeId, res.counter());
         }
     }
 
@@ -514,10 +499,11 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param nodeId Node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest 
msg) {
-        onQueryDone(msg.counter());
+    private void processCoordinatorQueryAckRequest(UUID nodeId, 
CoordinatorQueryAckRequest msg) {
+        onQueryDone(nodeId, msg.counter());
     }
 
     /**
@@ -603,10 +589,10 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
         if (prevCrdQueries.previousQueriesDone()) {
             cleanupVer = committedCntr.get() - 1;
 
-            for (Long qryVer : activeQueries.keySet()) {
-                if (qryVer <= cleanupVer)
-                    cleanupVer = qryVer - 1;
-            }
+            Long qryVer = activeQueries.minimalQueryCounter();
+
+            if (qryVer != null && qryVer <= cleanupVer)
+                cleanupVer = qryVer - 1;
         }
         else
             cleanupVer = -1;
@@ -634,102 +620,204 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
             fut.onDone();
     }
 
-    static boolean increment(AtomicInteger cntr) {
-        for (;;) {
-            int current = cntr.get();
+    /**
+     *
+     */
+    class ActiveQueries {
+        /** */
+        private final Map<UUID, TreeMap<Long, AtomicInteger>> activeQueries = 
new HashMap<>();
 
-            if (current == 0)
-                return false;
+        /** */
+        private Long minQry;
 
-            if (cntr.compareAndSet(current, current + 1))
-                return true;
+        Long minimalQueryCounter() {
+            synchronized (this) {
+                return minQry;
+            }
         }
-    }
 
-    /**
-     * @param qryNodeId Node initiated query.
-     * @return Counter for query.
-     */
-    private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, 
long futId) {
-        assert crdVer != 0;
+        synchronized MvccCoordinatorVersionResponse assignQueryCounter(UUID 
nodeId, long futId) {
+            MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
 
-        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
+            Long mvccCntr;
+            Long trackCntr;
 
-        Long mvccCntr;
+            for(;;) {
+                mvccCntr = committedCntr.get();
 
-        for(;;) {
-            mvccCntr = committedCntr.get();
+                trackCntr = mvccCntr;
 
-            Long trackCntr = mvccCntr;
+                for (Long txVer : activeTxs.keySet()) {
+                    if (txVer < trackCntr)
+                        trackCntr = txVer;
 
-            for (Long txVer : activeTxs.keySet()) {
-                if (txVer < trackCntr)
-                    trackCntr = txVer;
+                    res.addTx(txVer);
+                }
 
-                res.addTx(txVer);
-            }
+                Long minQry0 = minQry;
 
-            registerActiveQuery(trackCntr);
+                if (minQry == null || trackCntr < minQry)
+                    minQry = trackCntr;
 
-            if (committedCntr.get() == mvccCntr)
-                break;
-            else {
-                res.resetTransactionsCount();
+                if (committedCntr.get() == mvccCntr)
+                    break;
+
+                minQry = minQry0;
 
-                onQueryDone(trackCntr);
+                res.resetTransactionsCount();
             }
+
+            TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId);
+
+            if (nodeMap == null)
+                activeQueries.put(nodeId, nodeMap = new TreeMap<>());
+
+            AtomicInteger qryCnt = nodeMap.get(trackCntr);
+
+            if (qryCnt == null)
+                nodeMap.put(trackCntr, new AtomicInteger(1));
+            else
+                qryCnt.incrementAndGet();
+
+            res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+
+            return res;
         }
 
-        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+        synchronized void onQueryDone(UUID nodeId, Long mvccCntr) {
+            TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId);
 
-        return res;
-    }
+            if (nodeMap == null)
+                return;
 
-    private void registerActiveQuery(Long cntr) {
-        for (;;) {
-            AtomicInteger qryCnt = activeQueries.get(cntr);
+            assert minQry != null;
 
-            if (qryCnt != null) {
-                boolean inc = increment(qryCnt);
+            AtomicInteger qryCnt = nodeMap.get(mvccCntr);
 
-                if (!inc) {
-                    activeQueries.remove(mvccCntr, qryCnt);
+            assert qryCnt != null : "[node=" + nodeId + ", nodeMap=" + nodeMap 
+ ", cntr=" + mvccCntr + "]";
 
-                    continue;
-                }
+            int left = qryCnt.decrementAndGet();
+
+            if (left == 0) {
+                nodeMap.remove(mvccCntr);
+
+                if (mvccCntr == minQry.longValue())
+                    minQry = activeMinimal();
             }
-            else {
-                qryCnt = new AtomicInteger(1);
+        }
 
-                if (activeQueries.putIfAbsent(cntr, qryCnt) != null)
-                    continue;
+        synchronized void onNodeFailed(UUID nodeId) {
+            activeQueries.remove(nodeId);
+
+            minQry = activeMinimal();
+        }
+
+        private Long activeMinimal() {
+            Long min = null;
+
+            for (TreeMap<Long, AtomicInteger> m : activeQueries.values()) {
+                Map.Entry<Long, AtomicInteger> e = m.firstEntry();
+
+                if (e != null && (min == null || e.getKey() < min))
+                    min = e.getKey();
             }
 
-            break;
+            return min;
         }
     }
 
-    private void onNodeFailed(UUID nodeId) {
-        // TODO
+    /**
+     * @param qryNodeId Node initiated query.
+     * @return Counter for query.
+     */
+    private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, 
long futId) {
+        assert crdVer != 0;
+
+        return activeQueries.assignQueryCounter(qryNodeId, futId);
+
+//        MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
+//
+//        Long mvccCntr;
+//
+//        for(;;) {
+//            mvccCntr = committedCntr.get();
+//
+//            Long trackCntr = mvccCntr;
+//
+//            for (Long txVer : activeTxs.keySet()) {
+//                if (txVer < trackCntr)
+//                    trackCntr = txVer;
+//
+//                res.addTx(txVer);
+//            }
+//
+//            registerActiveQuery(trackCntr);
+//
+//            if (committedCntr.get() == mvccCntr)
+//                break;
+//            else {
+//                res.resetTransactionsCount();
+//
+//                onQueryDone(trackCntr);
+//            }
+//        }
+//
+//        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+//
+//        return res;
     }
+//
+//    private void registerActiveQuery(Long mvccCntr) {
+//        for (;;) {
+//            AtomicInteger qryCnt = activeQueries.get(mvccCntr);
+//
+//            if (qryCnt != null) {
+//                boolean inc = increment(qryCnt);
+//
+//                if (!inc) {
+//                    activeQueries.remove(mvccCntr, qryCnt);
+//
+//                    continue;
+//                }
+//            }
+//            else {
+//                qryCnt = new AtomicInteger(1);
+//
+//                if (activeQueries.putIfAbsent(mvccCntr, qryCnt) != null)
+//                    continue;
+//            }
+//
+//            break;
+//        }
+//    }
+//
+//    static boolean increment(AtomicInteger cntr) {
+//        for (;;) {
+//            int current = cntr.get();
+//
+//            if (current == 0)
+//                return false;
+//
+//            if (cntr.compareAndSet(current, current + 1))
+//                return true;
+//        }
+//    }
 
     /**
      * @param mvccCntr Query counter.
      */
-    private void onQueryDone(long mvccCntr) {
-        AtomicInteger cntr = activeQueries.get(mvccCntr);
-
-        assert cntr != null : mvccCntr;
-
-        int left = cntr.decrementAndGet();
-
-        assert left >= 0 : left;
-
-        if (left == 0) {
-            boolean rmv = activeQueries.remove(mvccCntr, cntr);
-
-            assert rmv;
-        }
+    private void onQueryDone(UUID nodeId, Long mvccCntr) {
+        activeQueries.onQueryDone(nodeId, mvccCntr);
+//        AtomicInteger qryCnt = activeQueries.get(mvccCntr);
+//
+//        assert qryCnt != null : mvccCntr;
+//
+//        int left = qryCnt.decrementAndGet();
+//
+//        assert left >= 0 : left;
+//
+//        if (left == 0)
+//            activeQueries.remove(mvccCntr, qryCnt);
     }
 
     /**
@@ -835,7 +923,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param nodeId Node ID
-     * @param activeQueries
+     * @param activeQueries Active queries.
      */
     public void processClientActiveQueries(UUID nodeId,
         @Nullable Map<MvccCounter, Integer> activeQueries) {
@@ -844,6 +932,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param topVer Topology version.
+     * @param discoCache Discovery data.
      * @param activeQueries Current queries.
      */
     public void initCoordinator(AffinityTopologyVersion topVer,
@@ -1040,7 +1129,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
             else if (msg instanceof CoordinatorFutureResponse)
                 processCoordinatorAckResponse(nodeId, 
(CoordinatorFutureResponse)msg);
             else if (msg instanceof CoordinatorQueryAckRequest)
-                
processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
+                processCoordinatorQueryAckRequest(nodeId, 
(CoordinatorQueryAckRequest)msg);
             else if (msg instanceof CoordinatorQueryVersionRequest)
                 processCoordinatorQueryVersionRequest(nodeId, 
(CoordinatorQueryVersionRequest)msg);
             else if (msg instanceof MvccCoordinatorVersionResponse)

http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index dfe584e..89d7746 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -50,6 +50,11 @@ class PreviousCoordinatorQueries {
     /** */
     private boolean initDone;
 
+    /**
+     * @param srvNodesQueries Active queries started on server nodes.
+     * @param discoCache Discovery data.
+     * @param mgr Discovery manager.
+     */
     void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache 
discoCache, GridDiscoveryManager mgr) {
         synchronized (this) {
             assert !initDone;
@@ -69,17 +74,24 @@ class PreviousCoordinatorQueries {
                     addAwaitedActiveQueries(e.getKey(), e.getValue());
             }
 
-            if (initDone)
+            if (initDone && !prevQueriesDone)
                 prevQueriesDone = activeQueries.isEmpty();
         }
     }
 
+    /**
+     * @return {@code True} if all queries on
+     */
     boolean previousQueriesDone() {
         return prevQueriesDone;
     }
 
+    /**
+     * @param nodeId Node ID.
+     * @param nodeQueries Active queries started on node.
+     */
     private void addAwaitedActiveQueries(UUID nodeId, Map<MvccCounter, 
Integer> nodeQueries) {
-        if (nodeQueries == null || prevQueriesDone)
+        if (F.isEmpty(nodeQueries) || prevQueriesDone)
             return;
 
         Map<MvccCounter, Integer> queries = activeQueries.get(nodeId);
@@ -103,10 +115,15 @@ class PreviousCoordinatorQueries {
             }
         }
 
-        prevQueriesDone = activeQueries.isEmpty();
+        if (initDone && !prevQueriesDone)
+            prevQueriesDone = activeQueries.isEmpty();
     }
 
-    void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, 
Integer> activeQueries) {
+    /**
+     * @param nodeId Node ID.
+     * @param nodeQueries Active queries started on node.
+     */
+    void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, 
Integer> nodeQueries) {
         synchronized (this) {
             if (initDone)
                 return;
@@ -120,7 +137,10 @@ class PreviousCoordinatorQueries {
             else
                 initDone = waitNodes.remove(nodeId);
 
-            addAwaitedActiveQueries(nodeId, activeQueries);
+            addAwaitedActiveQueries(nodeId, nodeQueries);
+
+            if (initDone && !prevQueriesDone)
+                prevQueriesDone = activeQueries.isEmpty();
         }
     }
 
@@ -153,18 +173,16 @@ class PreviousCoordinatorQueries {
 
             int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
 
-            if (initDone) {
-                if (newQryCnt == 0) {
-                    nodeQueries.remove(cntr);
+            if (newQryCnt == 0) {
+                nodeQueries.remove(cntr);
 
-                    if (nodeQueries.isEmpty())
-                        activeQueries.remove(nodeId);
+                if (nodeQueries.isEmpty()) {
+                    activeQueries.remove(nodeId);
 
-                    prevQueriesDone = activeQueries.isEmpty();
+                    if (initDone && !prevQueriesDone)
+                        prevQueriesDone = activeQueries.isEmpty();
                 }
             }
-            else
-                nodeQueries.put(cntr, newQryCnt);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index be7d44a..35c9011 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1536,6 +1536,8 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         stopGrid(1);
 
+        checkActiveQueriesCleanup(ignite(0));
+
         verifyCoordinatorInternalState();
 
         try {
@@ -2365,15 +2367,31 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     private void checkActiveQueriesCleanup(Ignite node) throws Exception {
         final CacheCoordinatorsProcessor crd = 
((IgniteKernal)node).context().cache().context().coordinators();
 
-        assertTrue("Active queries not empty", GridTestUtils.waitForCondition(
+        assertTrue("Active queries not cleared", 
GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    Map queries = GridTestUtils.getFieldValue(crd, 
"activeQueries");
+                    Object activeQueries = GridTestUtils.getFieldValue(crd, 
"activeQueries");
 
-                    if (!queries.isEmpty())
-                        log.info("Active queries: " + queries);
+                    synchronized (activeQueries) {
+                        Long minQry = 
GridTestUtils.getFieldValue(activeQueries, "minQry");
 
-                    return queries.isEmpty();
+                        if (minQry != null)
+                            log.info("Min query: " + minQry);
+
+                        Map<Object, Map> queriesMap = 
GridTestUtils.getFieldValue(activeQueries, "activeQueries");
+
+                        boolean empty = true;
+
+                        for (Map.Entry<Object, Map> e : queriesMap.entrySet()) 
{
+                            if (!e.getValue().isEmpty()) {
+                                empty = false;
+
+                                log.info("Active queries: " + e);
+                            }
+                        }
+
+                        return empty && minQry == null;
+                    }
                 }
             }, 8_000)
         );

Reply via email to