Repository: ignite Updated Branches: refs/heads/ignite-3479 [created] 58fc58635
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58fc5863 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58fc5863 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58fc5863 Branch: refs/heads/ignite-3479 Commit: 58fc586357687a5b9022bc2c4f8e49b9e449559f Parents: 7a4baba Author: sboikov <[email protected]> Authored: Mon Sep 25 17:51:49 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 25 17:51:49 2017 +0300 ---------------------------------------------------------------------- .../processors/affinity/AffinityAssignment.java | 2 + .../affinity/GridAffinityAssignment.java | 14 +++- .../affinity/GridAffinityAssignmentCache.java | 18 ++++- .../affinity/GridAffinityProcessor.java | 2 +- .../processors/affinity/GridAffinityUtils.java | 2 +- .../affinity/HistoryAffinityAssignment.java | 11 ++- .../processors/cache/ExchangeContext.java | 19 ++++- .../cache/GridCacheAffinityManager.java | 4 ++ .../GridCachePartitionExchangeManager.java | 7 ++ .../distributed/dht/GridDhtCacheAdapter.java | 2 + .../distributed/dht/GridDhtTxFinishFuture.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 19 ++++- .../mvcc/CacheCoordinatorsSharedManager.java | 74 +++++++++++++------- .../mvcc/CoordinatorAssignmentHistory.java | 71 ------------------- .../processors/cache/mvcc/MvccQueryFuture.java | 27 +++++++ .../query/GridCacheDistributedQueryManager.java | 2 +- .../cache/query/GridCacheQueryManager.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 2 + 20 files changed, 169 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java index 06207d3..acb9213 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java @@ -85,4 +85,6 @@ public interface AffinityAssignment { * @return Backup partitions for specified node ID. */ public Set<Integer> backupPartitions(UUID nodeId); + + public ClusterNode mvccCoordinator(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 35130a3..2913930 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -39,6 +39,9 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable /** Topology version. */ private final AffinityTopologyVersion topVer; + /** */ + private final ClusterNode mvccCrd; + /** Collection of calculated affinity nodes. */ private List<List<ClusterNode>> assignment; @@ -69,6 +72,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable this.topVer = topVer; primary = new HashMap<>(); backup = new HashMap<>(); + mvccCrd = null; clientEvtChange = false; } @@ -79,7 +83,8 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable */ GridAffinityAssignment(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment, - List<List<ClusterNode>> idealAssignment) { + List<List<ClusterNode>> idealAssignment, + ClusterNode mvccCrd) { assert topVer != null; assert assignment != null; assert idealAssignment != null; @@ -87,6 +92,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable this.topVer = topVer; this.assignment = assignment; this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment; + this.mvccCrd = mvccCrd; primary = new HashMap<>(); backup = new HashMap<>(); @@ -106,6 +112,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable idealAssignment = aff.idealAssignment; primary = aff.primary; backup = aff.backup; + mvccCrd = aff.mvccCrd; clientEvtChange = true; } @@ -264,6 +271,11 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable } /** {@inheritDoc} */ + @Override public ClusterNode mvccCoordinator() { + return mvccCrd; + } + + /** {@inheritDoc} */ @Override public int hashCode() { return topVer.hashCode(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index f921251..4b0659c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -184,10 +184,22 @@ public class GridAffinityAssignmentCache { * @param affAssignment Affinity assignment for topology version. */ public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { + ClusterNode mvccCrd = ctx.cache().context().coordinators().currentCoordinatorForCacheAffinity(topVer); + + initialize(topVer, affAssignment, mvccCrd); + } + + /** + * Initializes affinity with given topology version and assignment. + * + * @param topVer Topology version. + * @param affAssignment Affinity assignment for topology version. + */ + private void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, ClusterNode mvccCrd) { assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']'; assert idealAssignment != null; - GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment); + GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment, mvccCrd); affCache.put(topVer, new HistoryAffinityAssignment(assignment)); head.set(assignment); @@ -570,7 +582,9 @@ public class GridAffinityAssignmentCache { idealAssignment(aff.idealAssignment()); - initialize(aff.lastVersion(), aff.assignments(aff.lastVersion())); + AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion()); + + initialize(aff.lastVersion(), assign.assignment(), assign.mvccCoordinator()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 9c9fb8f..3a142c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -384,7 +384,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { try { GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? (GridAffinityAssignment)assign0 : - new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator()); AffinityInfo info = new AffinityInfo( cctx.config().getAffinity(), http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java index abd5292..15d7e4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java @@ -184,7 +184,7 @@ class GridAffinityUtils { GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? (GridAffinityAssignment)assign0 : - new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator()); return F.t( affinityMessage(ctx, cctx.config().getAffinity()), http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java index e502dd5..cae3611 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java @@ -43,17 +43,26 @@ public class HistoryAffinityAssignment implements AffinityAssignment { /** */ private final boolean clientEvtChange; + /** */ + private final ClusterNode mvccCrd; + /** * @param assign Assignment. */ - public HistoryAffinityAssignment(GridAffinityAssignment assign) { + HistoryAffinityAssignment(GridAffinityAssignment assign) { this.topVer = assign.topologyVersion(); this.assignment = assign.assignment(); this.idealAssignment = assign.idealAssignment(); + this.mvccCrd = assign.mvccCoordinator(); this.clientEvtChange = assign.clientEventChange(); } /** {@inheritDoc} */ + @Override public ClusterNode mvccCoordinator() { + return mvccCrd; + } + + /** {@inheritDoc} */ @Override public boolean clientEventChange() { return clientEvtChange; } http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index 4046c98..c9f0744 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -51,17 +51,22 @@ public class ExchangeContext { /** */ private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false); + /** */ + private final boolean mvccCrdChange; + /** * @param crd Coordinator flag. * @param fut Exchange future. */ - public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) { + public ExchangeContext(boolean crd, boolean mvccCrdChange, GridDhtPartitionsExchangeFuture fut) { + this.mvccCrdChange = mvccCrdChange; + int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion()); if (compatibilityNode || (crd && fut.localJoinExchange())) { fetchAffOnJoin = true; - merge = false; + merge = !mvccCrdChange; } else { boolean startCaches = fut.exchangeId().isJoined() && @@ -69,7 +74,8 @@ public class ExchangeContext { fetchAffOnJoin = protocolVer == 1; - merge = !startCaches && + merge = !mvccCrdChange && + !startCaches && protocolVer > 1 && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT; } @@ -124,6 +130,13 @@ public class ExchangeContext { return merge; } + /** + * @return {@code True} if mvcc coordinator node is changed during this exchange. + */ + public boolean mvccCoordinatorChange() { + return mvccCrdChange; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ExchangeContext.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 702b848..a2407e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -238,6 +238,10 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { return aff0.cachedAffinity(topVer); } + public ClusterNode mvccCoordinator(AffinityTopologyVersion topVer) { + return assignment(topVer).mvccCoordinator(); + } + /** * @param key Key to check. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fe9ed29..b576789 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1949,6 +1949,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ClusterNode node = evt.eventNode(); + if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && + node.equals(cctx.coordinators().currentCoordinator())) { + if (log.isInfoEnabled()) + log.info("Stop merge, need exchange for mvcc coordinator failure: " + node); + + break; + } if (!curFut.context().supportsMergeExchanges(node)) { if (log.isInfoEnabled()) log.info("Stop merge, node does not support merge: " + node); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index ac04e4b..eaeef53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -1234,6 +1234,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (expVer.equals(curVer)) return false; + // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs. + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer); Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index dd00ad1..31f12b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -295,7 +295,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity GridLongList waitTxs = tx.mvccWaitTransactions(); if (waitTxs != null) { - ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + ClusterNode crd = cctx.coordinators().currentCoordinator(); assert crd != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 0fe17a8..0431224 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1234,7 +1234,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (req.requestMvccCounter()) { assert tx.txState().mvccEnabled(cctx); - ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + ClusterNode crd = cctx.coordinators().currentCoordinator(); assert crd != null : tx.topologyVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 9b7d733..11d81a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -154,7 +154,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda // TODO IGNITE-3478 (correct failover and remap). if (cctx.mvccEnabled()) { - mvccCrd = cctx.shared().coordinators().coordinator(topVer); + mvccCrd = cctx.affinity().mvccCoordinator(topVer); if (mvccCrd == null) { onDone(new ClusterTopologyCheckedException("Mvcc coordinator is not assigned: " + topVer)); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 03b7b6e..47e1f17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -552,7 +553,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); - exchCtx = new ExchangeContext(crdNode, this); + boolean mvccCrdChange = false; + + if (localJoinExchange()) + cctx.coordinators().reassignCoordinator(firstEvtDiscoCache); + else if (exchId.isLeft()){ + ClusterNode mvccCrd = cctx.coordinators().currentCoordinator(); + + if (mvccCrd != null && mvccCrd.equals(exchId.eventNode())) { + ClusterNode newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache); + + mvccCrdChange = !Objects.equals(mvccCrd, newMvccCrd); + } + } + + exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this); assert state == null : state; @@ -1418,8 +1433,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (err == null) { - cctx.coordinators().assignCoordinator(exchCtx.events().discoveryCache()); - if (centralizedAff) { assert !exchCtx.mergeExchanges(); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index c46a624..ea44df1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; @@ -72,7 +71,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private static final byte MSG_POLICY = SYSTEM_POOL; /** */ - private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory(); + private volatile Coordinator curCrd; /** */ private final AtomicLong mvccCntr = new AtomicLong(1L); @@ -144,7 +143,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @return Counter. */ public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) { - assert cctx.localNode().equals(assignHist.currentCoordinator()); + assert cctx.localNode().equals(currentCoordinator()); return assignTxCounter(tx.nearXidVersion(), 0L); } @@ -682,43 +681,48 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } } - /** - * @param topVer Topology version. - * @return MVCC coordinator for given topology version. - */ - @Nullable public ClusterNode coordinator(AffinityTopologyVersion topVer) { - return assignHist.coordinator(topVer); + public ClusterNode currentCoordinator() { + Coordinator crd = curCrd; + + return crd != null ? crd.crd : null; + } + + public ClusterNode currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) { + Coordinator crd = curCrd; + + assert crd == null || crd.topVer.compareTo(topVer) <= 0 : crd; + + return crd != null ? crd.crd : null; } /** * @param discoCache Discovery snapshot. */ - public void assignCoordinator(DiscoCache discoCache) { - ClusterNode curCrd = assignHist.currentCoordinator(); - - if (curCrd == null || !discoCache.allNodes().contains(curCrd)) { - ClusterNode newCrd = null; - - if (!discoCache.serverNodes().isEmpty()) - newCrd = discoCache.serverNodes().get(0); + public ClusterNode reassignCoordinator(DiscoCache discoCache) { + ClusterNode curCrd = currentCoordinator(); - if (!F.eq(curCrd, newCrd)) { - assignHist.addAssignment(discoCache.version(), newCrd); + assert curCrd == null || !discoCache.allNodes().contains(curCrd) : curCrd; - if (cctx.localNode().equals(newCrd)) { - crdVer = discoCache.version().topologyVersion(); + ClusterNode newCrd; - crdLatch.countDown(); - } + if (!discoCache.serverNodes().isEmpty()) { + newCrd = discoCache.serverNodes().get(0); - log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() + - ", crd=" + newCrd + ']'); + if (cctx.localNode().equals(newCrd)) { + crdVer = discoCache.version().topologyVersion(); - return; + crdLatch.countDown(); } + + log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() + + ", crd=" + newCrd + ']'); } + else + newCrd = null; + + this.curCrd = new Coordinator(newCrd, discoCache.version()); - assignHist.addAssignment(discoCache.version(), curCrd); + return newCrd; } /** @@ -996,4 +1000,20 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager this.txId = txId; } } + + /** + * + */ + static class Coordinator { + /** */ + final ClusterNode crd; + + /** */ + final AffinityTopologyVersion topVer; + + Coordinator(ClusterNode crd, AffinityTopologyVersion topVer) { + this.crd = crd; + this.topVer = topVer; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java deleted file mode 100644 index 40354a8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.lang.IgniteBiTuple; - -/** - * - */ -class CoordinatorAssignmentHistory { - /** */ - private volatile Map<AffinityTopologyVersion, ClusterNode> assignHist = Collections.emptyMap(); - - /** */ - private volatile IgniteBiTuple<AffinityTopologyVersion, ClusterNode> - cur = new IgniteBiTuple<>(AffinityTopologyVersion.NONE, null); - - void addAssignment(AffinityTopologyVersion topVer, ClusterNode crd) { - assert !assignHist.containsKey(topVer); - assert topVer.compareTo(cur.get1()) > 0; - - cur = new IgniteBiTuple<>(topVer, crd); - - Map<AffinityTopologyVersion, ClusterNode> hist = new HashMap<>(assignHist); - - hist.put(topVer, crd); - - assignHist = hist; - - } - - ClusterNode currentCoordinator() { - return cur.get2(); - } - - ClusterNode coordinator(AffinityTopologyVersion topVer) { - assert topVer.initialized() : topVer; - - IgniteBiTuple<AffinityTopologyVersion, ClusterNode> cur0 = cur; - - if (cur0.get1().equals(topVer)) - return cur0.get2(); - - Map<AffinityTopologyVersion, ClusterNode> assignHist0 = assignHist; - - assert assignHist.containsKey(topVer) : - "No coordinator assignment [topVer=" + topVer + ", curVer=" + cur0.get1() + ", hist=" + assignHist0.keySet() + ']'; - - return assignHist0.get(topVer); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java new file mode 100644 index 0000000..62160e1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.cluster.ClusterNode; + +/** + * + */ +public interface MvccQueryFuture { + void coordinatorChanged(ClusterNode node); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 3433b4f..3c43768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -539,7 +539,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final MvccCoordinatorVersion mvccVer; if (cctx.mvccEnabled()) { - mvccCrd = cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion()); + mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index b711a80..f46f8d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1465,7 +1465,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // TODO IGNITE-3478. if (cctx.mvccEnabled()) { - mvccCrd = cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion()); + mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index ef42a14..ff7de90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -605,6 +605,8 @@ public class IgniteTxHandler { if (expVer.equals(curVer)) return false; + // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs. + for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { GridCacheContext ctx = e.context();
