Repository: ignite Updated Branches: refs/heads/ignite-3479 331a255cd -> 537a3ecb2
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/537a3ecb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/537a3ecb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/537a3ecb Branch: refs/heads/ignite-3479 Commit: 537a3ecb2218c8676ac14886b7aa04081b975f7c Parents: 331a255 Author: sboikov <[email protected]> Authored: Wed Sep 27 12:21:14 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 27 15:10:40 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 12 ++ .../affinity/GridAffinityAssignmentCache.java | 4 +- .../cache/CacheAffinitySharedManager.java | 17 +- .../processors/cache/ExchangeContext.java | 33 ++-- .../dht/GridPartitionedGetFuture.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 18 +- .../mvcc/CacheCoordinatorsSharedManager.java | 118 ++++++++++--- .../processors/cache/mvcc/MvccCoordinator.java | 48 ++++-- .../processors/cache/mvcc/MvccCounter.java | 73 +++++++- .../mvcc/NewCoordinatorQueryAckRequest.java | 156 +++++++++++++++++ .../cache/mvcc/PreviousCoordinatorQueries.java | 170 +++++++++++++++++++ .../query/GridCacheDistributedQueryManager.java | 7 +- .../cache/query/GridCacheQueryManager.java | 13 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 65 +++++-- 14 files changed, 639 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 22d2779..99bc8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -110,6 +110,8 @@ import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureRespons import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; +import org.apache.ignite.internal.processors.cache.mvcc.NewCoordinatorQueryAckRequest; import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; @@ -929,6 +931,16 @@ public class GridIoMessageFactory implements MessageFactory { return msg; + case 140: + msg = new NewCoordinatorQueryAckRequest(); + + return msg; + + case 141: + msg = new MvccCounter(); + + return msg; + // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 83837b8..fb4092a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -195,10 +195,12 @@ public class GridAffinityAssignmentCache { * * @param topVer Topology version. * @param affAssignment Affinity assignment for topology version. + * @param mvccCrd Mvcc coordinator. */ - private void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) { + public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) { assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']'; assert idealAssignment != null; + assert mvccCrd == null || topVer.compareTo(mvccCrd.topologyVersion()) >= 0 : "[mvccCrd=" + mvccCrd + ", topVer=" + topVer + ']'; GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment, mvccCrd); http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 99727e6..1f9890c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridLongList; @@ -499,6 +500,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert grp != null; GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer, + cctx.coordinators().currentCoordinator(), null, discoCache, grp.affinity(), @@ -1188,6 +1190,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap fetchFut.init(false); fetchAffinity(evts.topologyVersion(), + cctx.coordinators().currentCoordinator(), evts.lastEvent(), evts.discoveryCache(), aff, fetchFut); @@ -1536,6 +1539,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap int grpId = fetchFut.groupId(); fetchAffinity(topVer, + cctx.coordinators().currentCoordinator(), fut.events().lastEvent(), fut.events().discoveryCache(), cctx.cache().cacheGroup(grpId).affinity(), @@ -1545,6 +1549,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param topVer Topology version. + * @param mvccCrd Mvcc coordinator to set in affinity. * @param discoveryEvt Discovery event. * @param discoCache Discovery data cache. * @param affCache Affinity. @@ -1552,7 +1557,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. * @return Affinity assignment response. */ - private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer, + private GridDhtAffinityAssignmentResponse fetchAffinity( + AffinityTopologyVersion topVer, + MvccCoordinator mvccCrd, @Nullable DiscoveryEvent discoveryEvt, DiscoCache discoCache, GridAffinityAssignmentCache affCache, @@ -1565,7 +1572,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (res == null) { List<List<ClusterNode>> aff = affCache.calculate(topVer, discoveryEvt, discoCache); - affCache.initialize(topVer, aff); + affCache.initialize(topVer, aff, mvccCrd); } else { List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache); @@ -1582,7 +1589,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff != null : res; - affCache.initialize(topVer, aff); + affCache.initialize(topVer, aff, mvccCrd); } return res; @@ -1632,7 +1639,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. * @return Future completed when caches initialization is done. */ - public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut, + public IgniteInternalFuture<?> initCoordinatorCaches( + final GridDhtPartitionsExchangeFuture fut, final boolean newAff) throws IgniteCheckedException { final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); @@ -1700,6 +1708,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut) throws IgniteCheckedException { fetchAffinity(prev.topologyVersion(), + null, // Pass null mvcc coordinator, this affinity version should be used for queries. prev.events().lastEvent(), prev.events().discoveryCache(), aff, http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index 36ce6ef..67bf9ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; @@ -56,8 +58,8 @@ public class ExchangeContext { /** */ private final boolean newMvccCrd; - /** */ - private Map<MvccCounter, Integer> activeQrys; + /** Currently running mvcc queries, initialized when mvcc coordinator is changed. */ + private Map<UUID, Map<MvccCounter, Integer>> activeQueries; /** * @param crd Coordinator flag. @@ -71,7 +73,7 @@ public class ExchangeContext { if (compatibilityNode || (crd && fut.localJoinExchange())) { fetchAffOnJoin = true; - merge = !newMvccCrd; + merge = false; } else { boolean startCaches = fut.exchangeId().isJoined() && @@ -79,8 +81,7 @@ public class ExchangeContext { fetchAffOnJoin = protocolVer == 1; - merge = !newMvccCrd && - !startCaches && + merge = !startCaches && protocolVer > 1 && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT; } @@ -142,26 +143,18 @@ public class ExchangeContext { return newMvccCrd; } - public Map<MvccCounter, Integer> activeQueries() { - return activeQrys; + public Map<UUID, Map<MvccCounter, Integer>> activeQueries() { + return activeQueries; } - public void addActiveQueries(Map<MvccCounter, Integer> activeQrys0) { - if (activeQrys0 == null) + public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries0) { + if (activeQueries0 == null) return; - if (activeQrys != null) { - for (Map.Entry<MvccCounter, Integer> e : activeQrys0.entrySet()) { - Integer cnt = activeQrys.get(e.getKey()); + if (activeQueries == null) + activeQueries = new HashMap<>(); - if (cnt == null) - activeQrys.put(e.getKey(), e.getValue()); - else - activeQrys.put(e.getKey(), cnt + e.getValue()); - } - } - else - activeQrys = activeQrys0; + activeQueries.put(nodeId, activeQueries0); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 1476b2a..37e9feb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -335,7 +335,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } if (mvccVer0 != null) - cctx.shared().coordinators().ackQueryDone(mvccCrd0.node(), mvccVer0); + cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0); } cache().sendTtlUpdateRequest(expiryPlc); http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1642263..51da7a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -841,7 +841,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - exchCtx.addActiveQueries(activeQrys); + exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys); } } @@ -1304,7 +1304,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.partitionHistoryCounters(partHistReserved0); } - msg.activeQueries(exchCtx.activeQueries()); + Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries(); + + msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null); if (stateChangeExchange() && changeGlobalStateE != null) msg.setError(changeGlobalStateE); @@ -1482,7 +1484,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (err == null) { if (exchCtx.newMvccCoordinator() && cctx.localNode().equals(cctx.coordinators().currentCoordinatorNode())) - cctx.coordinators().initCoordinator(res, exchCtx.activeQueries()); + cctx.coordinators().initCoordinator(res, exchCtx.events().discoveryCache(), exchCtx.activeQueries()); if (centralizedAff) { assert !exchCtx.mergeExchanges(); @@ -1904,6 +1906,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { if (msg.client()) { + if (msg.activeQueries() != null) + cctx.coordinators().processClientActiveQueries(nodeId, msg.activeQueries()); + waitAndReplyToNode(nodeId, msg); return; @@ -2252,7 +2257,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (exchCtx.mergeExchanges()) { + if (exchCtx.mergeExchanges() && !exchCtx.newMvccCoordinator()) { if (log.isInfoEnabled()) log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']'); @@ -2324,7 +2329,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { GridDhtPartitionsSingleMessage msg = e.getValue(); - exchCtx.addActiveQueries(msg.activeQueries()); + if (exchCtx.newMvccCoordinator()) + exchCtx.addActiveQueries(e.getKey(), msg.activeQueries()); + else + assert msg.activeQueries() == null; // Apply update counters after all single messages are received. for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 2bf653c..f144437 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; import org.jsr166.LongAdder8; @@ -87,6 +88,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>(); /** */ + private final PreviousCoordinatorQueries prevCrdQueries = new PreviousCoordinatorQueries(); + + /** */ private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); /** */ @@ -138,7 +142,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); - cctx.gridEvents().addLocalEventListener(new CacheCoordinatorDiscoveryListener(), + cctx.gridEvents().addLocalEventListener(new CacheCoordinatorNodeFailListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); cctx.gridIO().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); @@ -169,9 +173,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * @param crd Coordinator. * @param lsnr Response listener. + * @param txVer Transaction version. * @return Counter request future. */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, MvccResponseListener lsnr, GridCacheVersion txVer) { + public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, + MvccResponseListener lsnr, + GridCacheVersion txVer) { assert !crd.isLocal() : crd; MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), @@ -197,32 +204,37 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param crd Coordinator. * @param mvccVer Query version. */ - public void ackQueryDone(ClusterNode crd, MvccCoordinatorVersion mvccVer) { - try { - long trackCntr = mvccVer.counter(); + public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) { + assert crd != null; - MvccLongList txs = mvccVer.activeTransactions(); + long trackCntr = mvccVer.counter(); - if (txs != null) { - for (int i = 0; i < txs.size(); i++) { - long txId = txs.get(i); + MvccLongList txs = mvccVer.activeTransactions(); - if (txId < trackCntr) - trackCntr = txId; - } + if (txs != null) { + for (int i = 0; i < txs.size(); i++) { + long txId = txs.get(i); + + if (txId < trackCntr) + trackCntr = txId; } + } - cctx.gridIO().sendToGridTopic(crd, + Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) : + new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr); + + try { + cctx.gridIO().sendToGridTopic(crd.node(), MSG_TOPIC, - new CoordinatorQueryAckRequest(trackCntr), + msg, MSG_POLICY); } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) - log.debug("Failed to send query ack, node left [crd=" + crd + ']'); + log.debug("Failed to send query ack, node left [crd=" + crd + ", msg=" + msg + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + mvccVer + ']', e); + U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e); } } @@ -401,7 +413,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager if (log.isDebugEnabled()) log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); - onQueryDone(res.counter()); + onNodeFailed(nodeId); } catch (IgniteCheckedException e) { U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); @@ -439,6 +451,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } /** + * @param nodeId Node ID. + * @param msg Message. + */ + private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) { + prevCrdQueries.onQueryDone(nodeId, msg); + } + + /** * @param nodeId Sender node ID. * @param msg Message. */ @@ -508,12 +528,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager assert old == null : txId; - long cleanupVer = committedCntr.get() - 1; + long cleanupVer; + + if (prevCrdQueries.previousQueriesDone()) { + cleanupVer = committedCntr.get() - 1; - for (Long qryVer : activeQueries.keySet()) { - if (qryVer <= cleanupVer) - cleanupVer = qryVer - 1; + for (Long qryVer : activeQueries.keySet()) { + if (qryVer <= cleanupVer) + cleanupVer = qryVer - 1; + } } + else + cleanupVer = -1; res.init(futId, crdVer, nextCtr, cleanupVer); @@ -613,6 +639,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } } + private void onNodeFailed(UUID nodeId) { + // TODO + } + /** * @param mvccCntr Query counter. */ @@ -709,34 +739,60 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager return curCrd != null ? curCrd.node() : null; } + /** + * @param topVer Cache affinity version (used for assert). + * @return Coordinator. + */ public MvccCoordinator currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) { MvccCoordinator crd = curCrd; // Assert coordinator did not already change. - assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 : crd; + assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 : + "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']'; return crd; } /** * @param discoCache Discovery snapshot. + * @return New coordinator. */ public MvccCoordinator reassignCoordinator(DiscoCache discoCache) { assert curCrd == null || !discoCache.allNodes().contains(curCrd.node()) : curCrd; if (!discoCache.serverNodes().isEmpty()) { - curCrd = new MvccCoordinator(discoCache.serverNodes().get(0), discoCache.version()); + curCrd = new MvccCoordinator(discoCache.serverNodes().get(0), + discoCache.version().topologyVersion(), + discoCache.version()); - log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() + - ", crd=" + curCrd.node().id() + ']'); + log.info("Assigned mvcc coordinator: " + curCrd); } - else + else { curCrd = null; + log.info("New mvcc coordinator was not assigned [topVer=" + discoCache.version() + ']'); + } + return curCrd; } - public void initCoordinator(AffinityTopologyVersion topVer, @Nullable Map<MvccCounter, Integer> activeQrys) { + /** + * @param nodeId Node ID. + * @param activeQueries Active queries. + */ + public void processClientActiveQueries(UUID nodeId, + @Nullable Map<MvccCounter, Integer> activeQueries) { + prevCrdQueries.processClientActiveQueries(nodeId, activeQueries); + } + + /** + * @param topVer Topology version. + * @param activeQueries Current queries. + */ + public void initCoordinator(AffinityTopologyVersion topVer, + DiscoCache discoCache, + Map<UUID, Map<MvccCounter, Integer>> activeQueries) + { assert cctx.localNode().equals(curCrd.node()); log.info("Initialize local node as mvcc coordinator [node=" + cctx.localNodeId() + @@ -744,6 +800,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager crdVer = topVer.topologyVersion(); + prevCrdQueries.init(activeQueries, discoCache, cctx.discovery()); + crdLatch.countDown(); } @@ -868,7 +926,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * */ - private class CacheCoordinatorDiscoveryListener implements GridLocalEventListener { + private class CacheCoordinatorNodeFailListener implements GridLocalEventListener { /** {@inheritDoc} */ @Override public void onEvent(Event evt) { assert evt instanceof DiscoveryEvent : evt; @@ -882,6 +940,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager for (WaitAckFuture fut : ackFuts.values()) fut.onNodeLeft(nodeId); + + prevCrdQueries.onNodeLeft(nodeId); } /** {@inheritDoc} */ @@ -930,6 +990,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); else if (msg instanceof CoordinatorWaitTxsRequest) processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); + else if (msg instanceof NewCoordinatorQueryAckRequest) + processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg); else U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java index 2affc5a..24ff354 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java @@ -27,39 +27,67 @@ public class MvccCoordinator { /** */ private final ClusterNode crd; + /** + * Unique coordinator version, increases when new coordinator is assigned, + * can differ from topVer if we decide to assign coordinator manually. + */ + private final long crdVer; + /** */ private final AffinityTopologyVersion topVer; - public MvccCoordinator(ClusterNode crd, final AffinityTopologyVersion topVer) { + /** + * @param crd Coordinator nde. + * @param crdVer Coordinator version. + * @param topVer Topology version when coordinator was assigned. + */ + public MvccCoordinator(ClusterNode crd, long crdVer, AffinityTopologyVersion topVer) { this.crd = crd; + this.crdVer = crdVer; this.topVer = topVer; } + /** + * @return Unique coordinator version. + */ + public long coordinatorVersion() { + return crdVer; + } + + /** + * @return Coordinator node. + */ public ClusterNode node() { return crd; } + /** + * @return Topology version when coordinator was assigned. + */ public AffinityTopologyVersion topologyVersion() { return topVer; } - @Override public boolean equals(Object other) { - if (this == other) + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; - if (other == null || getClass() != other.getClass()) + if (o == null || getClass() != o.getClass()) return false; - MvccCoordinator that = (MvccCoordinator)other; + MvccCoordinator that = (MvccCoordinator)o; - return topVer.equals(topVer) && crd.equals(that.crd); + return crdVer == that.crdVer; } + /** {@inheritDoc} */ @Override public int hashCode() { - int res = crd.hashCode(); - - res = 31 * res + topVer.hashCode(); + return (int)(crdVer ^ (crdVer >>> 32)); + } - return res; + /** {@inheritDoc} */ + @Override public String toString() { + return "MvccCoordinator [node=" + crd.id() + ", ver=" + crdVer + ", topVer=" + topVer + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java index 847822e..bec3301 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -32,19 +33,32 @@ public class MvccCounter implements Message { /** */ private long cntr; + /** + * + */ public MvccCounter() { // No-po. } + /** + * @param crdVer Coordinator version. + * @param cntr Counter. + */ public MvccCounter(long crdVer, long cntr) { this.crdVer = crdVer; this.cntr = cntr; } + /** + * @return Coordinator version. + */ public long coordinatorVersion() { return crdVer; } + /** + * @return Counter. + */ public long counter() { return cntr; } @@ -71,22 +85,70 @@ public class MvccCounter implements Message { /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - return false; + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + } + + return true; } /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return false; + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccCounter.class); } /** {@inheritDoc} */ @Override public short directType() { - return 0; + return 141; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 0; + return 2; } /** {@inheritDoc} */ @@ -94,7 +156,8 @@ public class MvccCounter implements Message { // No-op. } + /** {@inheritDoc} */ @Override public String toString() { - return super.toString(); + return S.toString(MvccCounter.class, this); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java new file mode 100644 index 0000000..40b8e01 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class NewCoordinatorQueryAckRequest implements MvccCoordinatorMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long crdVer; + + /** */ + private long cntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public NewCoordinatorQueryAckRequest() { + // No-op. + } + + /** + * @param crdVer Coordinator version. + * @param cntr Query counter. + */ + NewCoordinatorQueryAckRequest(long crdVer, long cntr) { + this.crdVer = crdVer; + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Coordinator version. + */ + public long coordinatorVersion() { + return crdVer; + } + + /** + * @return Counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(NewCoordinatorQueryAckRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 140; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NewCoordinatorQueryAckRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java new file mode 100644 index 0000000..dfe584e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class PreviousCoordinatorQueries { + /** */ + private volatile boolean prevQueriesDone; + + /** */ + private final ConcurrentHashMap<UUID, Map<MvccCounter, Integer>> activeQueries = new ConcurrentHashMap<>(); + + /** */ + private Set<UUID> rcvd; + + /** */ + private Set<UUID> waitNodes; + + /** */ + private boolean initDone; + + void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) { + synchronized (this) { + assert !initDone; + assert waitNodes == null; + + waitNodes = new HashSet<>(); + + for (ClusterNode node : discoCache.allNodes()) { + if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id())) + waitNodes.add(node.id()); + } + + initDone = waitNodes.isEmpty(); + + if (srvNodesQueries != null) { + for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet()) + addAwaitedActiveQueries(e.getKey(), e.getValue()); + } + + if (initDone) + prevQueriesDone = activeQueries.isEmpty(); + } + } + + boolean previousQueriesDone() { + return prevQueriesDone; + } + + private void addAwaitedActiveQueries(UUID nodeId, Map<MvccCounter, Integer> nodeQueries) { + if (nodeQueries == null || prevQueriesDone) + return; + + Map<MvccCounter, Integer> queries = activeQueries.get(nodeId); + + if (queries == null) + activeQueries.put(nodeId, nodeQueries); + else { + for (Map.Entry<MvccCounter, Integer> e : nodeQueries.entrySet()) { + Integer qryCnt = queries.get(e.getKey()); + + int newQryCnt = (qryCnt == null ? 0 : qryCnt) + e.getValue(); + + if (newQryCnt == 0) { + queries.remove(e.getKey()); + + if (queries.isEmpty()) + activeQueries.remove(nodeId); + } + else + queries.put(e.getKey(), newQryCnt); + } + } + + prevQueriesDone = activeQueries.isEmpty(); + } + + void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) { + synchronized (this) { + if (initDone) + return; + + if (waitNodes == null) { + if (rcvd == null) + rcvd = new HashSet<>(); + + rcvd.add(nodeId); + } + else + initDone = waitNodes.remove(nodeId); + + addAwaitedActiveQueries(nodeId, activeQueries); + } + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeLeft(UUID nodeId) { + synchronized (this) { + initDone = waitNodes != null && waitNodes.remove(nodeId); + + if (initDone && !prevQueriesDone && activeQueries.remove(nodeId) != null) + prevQueriesDone = activeQueries.isEmpty(); + } + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) { + synchronized (this) { + MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter()); + + Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId); + + if (nodeQueries == null) + activeQueries.put(nodeId, nodeQueries = new HashMap<>()); + + Integer qryCnt = nodeQueries.get(cntr); + + int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1; + + if (initDone) { + if (newQryCnt == 0) { + nodeQueries.remove(cntr); + + if (nodeQueries.isEmpty()) + activeQueries.remove(nodeId); + + prevQueriesDone = activeQueries.isEmpty(); + } + } + else + nodeQueries.put(cntr, newQryCnt); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index f5df18b..83e846f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet; @@ -535,13 +536,13 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage String clsName = qry.query().queryClassName(); // TODO IGNITE-3478. - final ClusterNode mvccCrd; + final MvccCoordinator mvccCrd; final MvccCoordinatorVersion mvccVer; if (cctx.mvccEnabled()) { - mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()).node(); + mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); - IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); + IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node()); mvccVer = fut0.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 67bd6e0..dda1e69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -825,7 +826,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @throws IgniteCheckedException If failed to get iterator. */ @SuppressWarnings({"unchecked"}) - private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode, ClusterNode mvccCrd) + private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode, MvccCoordinator mvccCrd) throws IgniteCheckedException { final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter(); @@ -1461,13 +1462,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - final ClusterNode mvccCrd; + final MvccCoordinator mvccCrd; // TODO IGNITE-3478. if (cctx.mvccEnabled()) { - mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()).node(); + mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); - IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); + IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node()); qry.mvccVersion(fut0.get()); } @@ -2915,7 +2916,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private IgniteCacheExpiryPolicy expiryPlc; /** */ - private ClusterNode mvccCrd; + private MvccCoordinator mvccCrd; /** */ private MvccCoordinatorVersion mvccVer; @@ -2938,7 +2939,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte IgniteBiPredicate<K, V> scanFilter, boolean locNode, GridCacheContext cctx, - ClusterNode mvccCrd, + MvccCoordinator mvccCrd, IgniteLogger log) { assert mvccCrd == null || qry.mvccVersion() != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/537a3ecb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 1b8a509..5c11a4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1671,20 +1671,35 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** - * TODO IGNITE-3478. - * * @throws Exception If failed. */ - public void testReadInProgressCoordinatorFails() throws Exception { + public void testReadInProgressCoordinatorFails_FromServer() throws Exception { + readInProgressCoordinatorFails(false); + } + + /** + * @throws Exception If failed. + */ + public void testReadInProgressCoordinatorFails_FromClient() throws Exception { + readInProgressCoordinatorFails(true); + } + + /** + * @param fromClient {@code True} if read from client node, otherwise from server node. + * @throws Exception If failed. + */ + private void readInProgressCoordinatorFails(boolean fromClient) throws Exception { testSpi = true; startGrids(4); client = true; - final Ignite client = startGrid(4); + assertTrue(startGrid(4).configuration().isClientMode()); + + final Ignite getNode = fromClient ? ignite(4) : ignite(1); - final IgniteCache cache = client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + final IgniteCache cache = getNode.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), getTestIgniteInstanceName(1)))); final Set<Integer> keys = new HashSet<>(); @@ -1699,15 +1714,15 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (Integer key : keys) vals.put(key, -1); - try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.putAll(vals); tx.commit(); } - final TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client); + final TestRecordingCommunicationSpi getNodeSpi = TestRecordingCommunicationSpi.spi(getNode); - clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + getNodeSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { @Override public boolean apply(ClusterNode node, Message msg) { return msg instanceof GridNearGetRequest; } @@ -1734,17 +1749,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }, "get-thread"); - clientSpi.waitForBlocked(); + getNodeSpi.waitForBlocked(); final IgniteInternalFuture releaseWaitFut = GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { Thread.sleep(3000); - clientSpi.stopBlock(true); + getNodeSpi.stopBlock(true); return null; } - }, "get-thread"); + }, "stop-block"); stopGrid(0); @@ -1754,7 +1769,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (Integer key : keys) vals.put(key, i); - try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.putAll(vals); tx.commit(); @@ -1763,6 +1778,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { releaseWaitFut.get(); getFut.get(); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); } /** @@ -2197,15 +2215,34 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { private void checkActiveQueriesCleanup(Ignite node) throws Exception { final CacheCoordinatorsSharedManager crd = ((IgniteKernal)node).context().cache().context().coordinators(); - assertTrue(GridTestUtils.waitForCondition( + assertTrue("Active queries not empty", GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { Map activeQrys = GridTestUtils.getFieldValue(crd, "activeQueries"); return activeQrys.isEmpty(); } - }, 5000) + }, 5_000) + ); + assertTrue("Previous coordinator queries not empty", GridTestUtils.waitForCondition( + new GridAbsPredicate() { + @Override public boolean apply() { + Map prevCrdQueries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries"); + + return prevCrdQueries.isEmpty(); + } + }, 5_000) ); + + if (crd.currentCoordinatorNode().equals(node.cluster().localNode())) { + assertTrue("prevQueriesDone flag is not set", GridTestUtils.waitForCondition( + new GridAbsPredicate() { + @Override public boolean apply() { + return GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone"); + } + }, 5_000) + ); + } } /**
