Repository: ignite Updated Branches: refs/heads/ignite-3479 c964314a8 -> 7607029cc
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7607029c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7607029c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7607029c Branch: refs/heads/ignite-3479 Commit: 7607029cceca82b685460efad98a22d7c947435b Parents: c964314 Author: sboikov <[email protected]> Authored: Thu Sep 28 12:06:56 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Sep 28 13:54:59 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ExchangeContext.java | 14 +- .../GridDhtPartitionsExchangeFuture.java | 5 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 315 ++++++++++++------- .../cache/mvcc/PreviousCoordinatorQueries.java | 44 ++- .../cache/mvcc/CacheMvccTransactionsTest.java | 28 +- 5 files changed, 268 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index 67bf9ce..55ffdaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -63,6 +63,7 @@ public class ExchangeContext { /** * @param crd Coordinator flag. + * @param newMvccCrd {@code True} if new coordinator assigned during this exchange. * @param fut Exchange future. */ public ExchangeContext(boolean crd, boolean newMvccCrd, GridDhtPartitionsExchangeFuture fut) { @@ -143,18 +144,25 @@ public class ExchangeContext { return newMvccCrd; } + /** + * @return Active queries. + */ public Map<UUID, Map<MvccCounter, Integer>> activeQueries() { return activeQueries; } - public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries0) { - if (activeQueries0 == null) + /** + * @param nodeId Node ID. + * @param nodeQueries Node queries. + */ + public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) { + if (nodeQueries == null) return; if (activeQueries == null) activeQueries = new HashMap<>(); - activeQueries.put(nodeId, activeQueries0); + activeQueries.put(nodeId, nodeQueries); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 01ec408..830d50b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -806,7 +806,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (exchCtx.newMvccCoordinator()) { assert mvccCrd != null; - Map<MvccCounter, Integer> activeQrys = null; + Map<MvccCounter, Integer> activeQrys = new HashMap<>(); for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) { if (fut instanceof MvccQueryAware) { @@ -815,9 +815,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (ver != null ) { MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter()); - if (activeQrys == null) - activeQrys = new HashMap<>(); - Integer cnt = activeQrys.get(cntr); if (cnt == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index e2d2183..b50b0a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.Collection; +import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,7 +33,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; -import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; @@ -42,7 +43,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridAtomicLong; @@ -92,7 +92,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); /** */ - private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>(); + private final ActiveQueries activeQueries = new ActiveQueries(); /** */ private final PreviousCoordinatorQueries prevCrdQueries = new PreviousCoordinatorQueries(); @@ -118,23 +118,6 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** */ private StatCounter[] statCntrs; - /** - * @param ver1 First version. - * @param ver2 Second version. - * @return Comparison result. - */ - public static int compareVersions(MvccCoordinatorVersion ver1, MvccCoordinatorVersion ver2) { - assert ver1 != null; - assert ver2 != null; - - int cmp = Long.compare(ver1.coordinatorVersion(), ver2.coordinatorVersion()); - - if (cmp != 0) - return cmp; - - return Long.compare(ver1.counter(), ver2.counter()); - } - /** */ private CacheCoordinatorsDiscoveryData discoData = new CacheCoordinatorsDiscoveryData(null); @@ -146,6 +129,24 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + statCntrs = new StatCounter[7]; + + statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); + statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); + statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); + statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); + statCntrs[4] = new StatCounter("TotalRequests"); + statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); + statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); + + ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(), + EVT_NODE_FAILED, EVT_NODE_LEFT); + + ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); + } + + /** {@inheritDoc} */ @Override public DiscoveryDataExchangeType discoveryDataType() { return DiscoveryDataExchangeType.CACHE_CRD_PROC; } @@ -172,6 +173,11 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { return discoData; } + /** + * @param evtType Event type. + * @param nodes Current nodes. + * @param topVer Topology version. + */ public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { MvccCoordinator crd = discoData.coordinator(); @@ -200,24 +206,6 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } } - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - statCntrs = new StatCounter[7]; - - statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); - statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); - statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); - statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); - statCntrs[4] = new StatCounter("TotalRequests"); - statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); - statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); - - ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(), - EVT_NODE_FAILED, EVT_NODE_LEFT); - - ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); - } - /** * @param log Logger. */ @@ -344,7 +332,6 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { assert txs != null && txs.size() > 0; // TODO IGNITE-3478: special case for local? - WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false); ackFuts.put(fut.id, fut); @@ -482,13 +469,11 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); - - onNodeFailed(nodeId); } catch (IgniteCheckedException e) { U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); - onQueryDone(res.counter()); + onQueryDone(nodeId, res.counter()); } } @@ -514,10 +499,11 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** + * @param nodeId Node ID. * @param msg Message. */ - private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest msg) { - onQueryDone(msg.counter()); + private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) { + onQueryDone(nodeId, msg.counter()); } /** @@ -603,10 +589,10 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { if (prevCrdQueries.previousQueriesDone()) { cleanupVer = committedCntr.get() - 1; - for (Long qryVer : activeQueries.keySet()) { - if (qryVer <= cleanupVer) - cleanupVer = qryVer - 1; - } + Long qryVer = activeQueries.minimalQueryCounter(); + + if (qryVer != null && qryVer <= cleanupVer) + cleanupVer = qryVer - 1; } else cleanupVer = -1; @@ -634,102 +620,204 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { fut.onDone(); } - static boolean increment(AtomicInteger cntr) { - for (;;) { - int current = cntr.get(); + /** + * + */ + class ActiveQueries { + /** */ + private final Map<UUID, TreeMap<Long, AtomicInteger>> activeQueries = new HashMap<>(); - if (current == 0) - return false; + /** */ + private Long minQry; - if (cntr.compareAndSet(current, current + 1)) - return true; + Long minimalQueryCounter() { + synchronized (this) { + return minQry; + } } - } - /** - * @param qryNodeId Node initiated query. - * @return Counter for query. - */ - private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { - assert crdVer != 0; + synchronized MvccCoordinatorVersionResponse assignQueryCounter(UUID nodeId, long futId) { + MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); + Long mvccCntr; + Long trackCntr; - Long mvccCntr; + for(;;) { + mvccCntr = committedCntr.get(); - for(;;) { - mvccCntr = committedCntr.get(); + trackCntr = mvccCntr; - Long trackCntr = mvccCntr; + for (Long txVer : activeTxs.keySet()) { + if (txVer < trackCntr) + trackCntr = txVer; - for (Long txVer : activeTxs.keySet()) { - if (txVer < trackCntr) - trackCntr = txVer; + res.addTx(txVer); + } - res.addTx(txVer); - } + Long minQry0 = minQry; - registerActiveQuery(trackCntr); + if (minQry == null || trackCntr < minQry) + minQry = trackCntr; - if (committedCntr.get() == mvccCntr) - break; - else { - res.resetTransactionsCount(); + if (committedCntr.get() == mvccCntr) + break; + + minQry = minQry0; - onQueryDone(trackCntr); + res.resetTransactionsCount(); } + + TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId); + + if (nodeMap == null) + activeQueries.put(nodeId, nodeMap = new TreeMap<>()); + + AtomicInteger qryCnt = nodeMap.get(trackCntr); + + if (qryCnt == null) + nodeMap.put(trackCntr, new AtomicInteger(1)); + else + qryCnt.incrementAndGet(); + + res.init(futId, crdVer, mvccCntr, COUNTER_NA); + + return res; } - res.init(futId, crdVer, mvccCntr, COUNTER_NA); + synchronized void onQueryDone(UUID nodeId, Long mvccCntr) { + TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId); - return res; - } + if (nodeMap == null) + return; - private void registerActiveQuery(Long cntr) { - for (;;) { - AtomicInteger qryCnt = activeQueries.get(cntr); + assert minQry != null; - if (qryCnt != null) { - boolean inc = increment(qryCnt); + AtomicInteger qryCnt = nodeMap.get(mvccCntr); - if (!inc) { - activeQueries.remove(mvccCntr, qryCnt); + assert qryCnt != null : "[node=" + nodeId + ", nodeMap=" + nodeMap + ", cntr=" + mvccCntr + "]"; - continue; - } + int left = qryCnt.decrementAndGet(); + + if (left == 0) { + nodeMap.remove(mvccCntr); + + if (mvccCntr == minQry.longValue()) + minQry = activeMinimal(); } - else { - qryCnt = new AtomicInteger(1); + } - if (activeQueries.putIfAbsent(cntr, qryCnt) != null) - continue; + synchronized void onNodeFailed(UUID nodeId) { + activeQueries.remove(nodeId); + + minQry = activeMinimal(); + } + + private Long activeMinimal() { + Long min = null; + + for (TreeMap<Long, AtomicInteger> m : activeQueries.values()) { + Map.Entry<Long, AtomicInteger> e = m.firstEntry(); + + if (e != null && (min == null || e.getKey() < min)) + min = e.getKey(); } - break; + return min; } } - private void onNodeFailed(UUID nodeId) { - // TODO + /** + * @param qryNodeId Node initiated query. + * @return Counter for query. + */ + private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { + assert crdVer != 0; + + return activeQueries.assignQueryCounter(qryNodeId, futId); + +// MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); +// +// Long mvccCntr; +// +// for(;;) { +// mvccCntr = committedCntr.get(); +// +// Long trackCntr = mvccCntr; +// +// for (Long txVer : activeTxs.keySet()) { +// if (txVer < trackCntr) +// trackCntr = txVer; +// +// res.addTx(txVer); +// } +// +// registerActiveQuery(trackCntr); +// +// if (committedCntr.get() == mvccCntr) +// break; +// else { +// res.resetTransactionsCount(); +// +// onQueryDone(trackCntr); +// } +// } +// +// res.init(futId, crdVer, mvccCntr, COUNTER_NA); +// +// return res; } +// +// private void registerActiveQuery(Long mvccCntr) { +// for (;;) { +// AtomicInteger qryCnt = activeQueries.get(mvccCntr); +// +// if (qryCnt != null) { +// boolean inc = increment(qryCnt); +// +// if (!inc) { +// activeQueries.remove(mvccCntr, qryCnt); +// +// continue; +// } +// } +// else { +// qryCnt = new AtomicInteger(1); +// +// if (activeQueries.putIfAbsent(mvccCntr, qryCnt) != null) +// continue; +// } +// +// break; +// } +// } +// +// static boolean increment(AtomicInteger cntr) { +// for (;;) { +// int current = cntr.get(); +// +// if (current == 0) +// return false; +// +// if (cntr.compareAndSet(current, current + 1)) +// return true; +// } +// } /** * @param mvccCntr Query counter. */ - private void onQueryDone(long mvccCntr) { - AtomicInteger cntr = activeQueries.get(mvccCntr); - - assert cntr != null : mvccCntr; - - int left = cntr.decrementAndGet(); - - assert left >= 0 : left; - - if (left == 0) { - boolean rmv = activeQueries.remove(mvccCntr, cntr); - - assert rmv; - } + private void onQueryDone(UUID nodeId, Long mvccCntr) { + activeQueries.onQueryDone(nodeId, mvccCntr); +// AtomicInteger qryCnt = activeQueries.get(mvccCntr); +// +// assert qryCnt != null : mvccCntr; +// +// int left = qryCnt.decrementAndGet(); +// +// assert left >= 0 : left; +// +// if (left == 0) +// activeQueries.remove(mvccCntr, qryCnt); } /** @@ -835,7 +923,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** * @param nodeId Node ID - * @param activeQueries + * @param activeQueries Active queries. */ public void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) { @@ -844,6 +932,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** * @param topVer Topology version. + * @param discoCache Discovery data. * @param activeQueries Current queries. */ public void initCoordinator(AffinityTopologyVersion topVer, @@ -1040,7 +1129,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { else if (msg instanceof CoordinatorFutureResponse) processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg); else if (msg instanceof CoordinatorQueryAckRequest) - processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg); + processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg); else if (msg instanceof CoordinatorQueryVersionRequest) processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); else if (msg instanceof MvccCoordinatorVersionResponse) http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java index dfe584e..89d7746 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java @@ -50,6 +50,11 @@ class PreviousCoordinatorQueries { /** */ private boolean initDone; + /** + * @param srvNodesQueries Active queries started on server nodes. + * @param discoCache Discovery data. + * @param mgr Discovery manager. + */ void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) { synchronized (this) { assert !initDone; @@ -69,17 +74,24 @@ class PreviousCoordinatorQueries { addAwaitedActiveQueries(e.getKey(), e.getValue()); } - if (initDone) + if (initDone && !prevQueriesDone) prevQueriesDone = activeQueries.isEmpty(); } } + /** + * @return {@code True} if all queries on + */ boolean previousQueriesDone() { return prevQueriesDone; } + /** + * @param nodeId Node ID. + * @param nodeQueries Active queries started on node. + */ private void addAwaitedActiveQueries(UUID nodeId, Map<MvccCounter, Integer> nodeQueries) { - if (nodeQueries == null || prevQueriesDone) + if (F.isEmpty(nodeQueries) || prevQueriesDone) return; Map<MvccCounter, Integer> queries = activeQueries.get(nodeId); @@ -103,10 +115,15 @@ class PreviousCoordinatorQueries { } } - prevQueriesDone = activeQueries.isEmpty(); + if (initDone && !prevQueriesDone) + prevQueriesDone = activeQueries.isEmpty(); } - void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) { + /** + * @param nodeId Node ID. + * @param nodeQueries Active queries started on node. + */ + void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) { synchronized (this) { if (initDone) return; @@ -120,7 +137,10 @@ class PreviousCoordinatorQueries { else initDone = waitNodes.remove(nodeId); - addAwaitedActiveQueries(nodeId, activeQueries); + addAwaitedActiveQueries(nodeId, nodeQueries); + + if (initDone && !prevQueriesDone) + prevQueriesDone = activeQueries.isEmpty(); } } @@ -153,18 +173,16 @@ class PreviousCoordinatorQueries { int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1; - if (initDone) { - if (newQryCnt == 0) { - nodeQueries.remove(cntr); + if (newQryCnt == 0) { + nodeQueries.remove(cntr); - if (nodeQueries.isEmpty()) - activeQueries.remove(nodeId); + if (nodeQueries.isEmpty()) { + activeQueries.remove(nodeId); - prevQueriesDone = activeQueries.isEmpty(); + if (initDone && !prevQueriesDone) + prevQueriesDone = activeQueries.isEmpty(); } } - else - nodeQueries.put(cntr, newQryCnt); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7607029c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index be7d44a..35c9011 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1536,6 +1536,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { stopGrid(1); + checkActiveQueriesCleanup(ignite(0)); + verifyCoordinatorInternalState(); try { @@ -2365,15 +2367,31 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { private void checkActiveQueriesCleanup(Ignite node) throws Exception { final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators(); - assertTrue("Active queries not empty", GridTestUtils.waitForCondition( + assertTrue("Active queries not cleared", GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - Map queries = GridTestUtils.getFieldValue(crd, "activeQueries"); + Object activeQueries = GridTestUtils.getFieldValue(crd, "activeQueries"); - if (!queries.isEmpty()) - log.info("Active queries: " + queries); + synchronized (activeQueries) { + Long minQry = GridTestUtils.getFieldValue(activeQueries, "minQry"); - return queries.isEmpty(); + if (minQry != null) + log.info("Min query: " + minQry); + + Map<Object, Map> queriesMap = GridTestUtils.getFieldValue(activeQueries, "activeQueries"); + + boolean empty = true; + + for (Map.Entry<Object, Map> e : queriesMap.entrySet()) { + if (!e.getValue().isEmpty()) { + empty = false; + + log.info("Active queries: " + e); + } + } + + return empty && minQry == null; + } } }, 8_000) );
