Repository: ignite Updated Branches: refs/heads/ignite-3479 e0196b003 -> c964314a8
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c964314a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c964314a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c964314a Branch: refs/heads/ignite-3479 Commit: c964314a8a145af5f9f9f5d0f186f7b569c5f346 Parents: e0196b0 Author: sboikov <[email protected]> Authored: Thu Sep 28 11:21:00 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Sep 28 11:25:04 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 13 ++++ .../discovery/GridDiscoveryManager.java | 4 + .../GridDhtPartitionsExchangeFuture.java | 21 ++---- .../GridNearPessimisticTxPrepareFuture.java | 2 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 78 +++++++++++++------- .../cache/mvcc/CacheMvccTransactionsTest.java | 59 +++++++++++++++ 6 files changed, 134 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 95e855a..b6cae3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -81,6 +82,9 @@ public class DiscoCache { /** */ private final AffinityTopologyVersion topVer; + /** */ + private final MvccCoordinator mvccCrd; + /** * @param topVer Topology version. * @param state Current cluster state. @@ -99,6 +103,7 @@ public class DiscoCache { AffinityTopologyVersion topVer, DiscoveryDataClusterState state, ClusterNode loc, + MvccCoordinator mvccCrd, List<ClusterNode> rmtNodes, List<ClusterNode> allNodes, List<ClusterNode> srvNodes, @@ -111,6 +116,7 @@ public class DiscoCache { this.topVer = topVer; this.state = state; this.loc = loc; + this.mvccCrd = mvccCrd; this.rmtNodes = rmtNodes; this.allNodes = allNodes; this.srvNodes = srvNodes; @@ -136,6 +142,13 @@ public class DiscoCache { } /** + * @return Mvcc coordinator node. + */ + @Nullable public MvccCoordinator mvccCoordinator() { + return mvccCrd; + } + + /** * @return Topology version. */ public AffinityTopologyVersion version() { http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 527399d..584df82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; @@ -616,6 +617,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { DiscoCache discoCache = null; + ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer); + boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id()); IgniteInternalFuture<Boolean> transitionWaitFut = null; @@ -2261,6 +2264,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topVer, state, loc, + ctx.coordinators().discoveryData().coordinator(), Collections.unmodifiableList(rmtNodes), Collections.unmodifiableList(allNodes), Collections.unmodifiableList(srvNodes), http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d93b359..01ec408 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -557,24 +557,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); - boolean newMvccCrd = false; + MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator(); - if (localJoinExchange()) { - MvccCoordinator mvccCrd = cctx.coordinators().currentCoordinator(); + boolean mvccCrdChange = mvccCrd != null && + initialVersion().equals(mvccCrd.topologyVersion()); - if (mvccCrd == null) { - newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null && - srvNodes.size() == 1; - } - } - else if (exchId.isLeft()){ - MvccCoordinator mvccCrd = cctx.coordinators().currentCoordinator(); - - if (mvccCrd != null && mvccCrd.nodeId().equals(exchId.eventNode().id())) - newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null; - } + cctx.kernalContext().coordinators().currentCoordinator(mvccCrd); - exchCtx = new ExchangeContext(crdNode, newMvccCrd, this); + exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this); assert state == null : state; @@ -585,6 +575,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (exchLog.isInfoEnabled()) { exchLog.info("Started exchange init [topVer=" + topVer + + ", mvccCrd=" + mvccCrd + ", crd=" + crdNode + ", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) + ", evtNode=" + firstDiscoEvt.eventNode().id() + http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 0664b1a..dbfea8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -431,7 +431,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (mvccCrd != null) { assert !tx.onePhaseCommit(); - if (mvccCrd.equals(cctx.localNodeId())) { + if (mvccCrd.nodeId().equals(cctx.localNodeId())) { MvccCoordinatorVersion mvccVer = cctx.coordinators().requestTxCounterOnCoordinator(tx); onMvccResponse(cctx.localNodeId(), mvccVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index 5f5da20..e2d2183 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; +import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -49,6 +50,7 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -134,7 +136,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** */ - private CacheCoordinatorsDiscoveryData discoData; + private CacheCoordinatorsDiscoveryData discoData = new CacheCoordinatorsDiscoveryData(null); /** * @param ctx Context. @@ -150,7 +152,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - Integer cmpId = DiscoveryDataExchangeType.CACHE_CRD_PROC.ordinal(); + Integer cmpId = discoveryDataType().ordinal(); if (!dataBag.commonDataCollectedFor(cmpId)) dataBag.addGridCommonData(cmpId, discoData); @@ -159,6 +161,43 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { discoData = (CacheCoordinatorsDiscoveryData)data.commonData(); + + assert discoData != null; + } + + /** + * @return Discovery data. + */ + public CacheCoordinatorsDiscoveryData discoveryData() { + return discoData; + } + + public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { + MvccCoordinator crd = discoData.coordinator(); + + if (crd == null || + ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) { + ClusterNode crdNode = null; + + // Expect nodes are sorted by order. + for (ClusterNode node : nodes) { + if (!CU.clientNode(node)) { + crdNode = node; + + break; + } + } + + crd = crdNode != null ? new + MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null; + + if (crd != null) + log.info("Assigned mvcc coordinator: " + crd); + else + U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); + + discoData = new CacheCoordinatorsDiscoveryData(crd); + } } /** {@inheritDoc} */ @@ -760,10 +799,20 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } } + /** + * @return + */ public MvccCoordinator currentCoordinator() { return curCrd; } + public void currentCoordinator(MvccCoordinator curCrd) { + this.curCrd = curCrd; + } + + /** + * @return + */ public UUID currentCoordinatorId() { MvccCoordinator curCrd = this.curCrd; @@ -785,31 +834,6 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** - * @param discoCache Discovery snapshot. - * @return New coordinator. - */ - public MvccCoordinator reassignCoordinator(DiscoCache discoCache) { - assert curCrd == null || !F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd; - - if (!discoCache.serverNodes().isEmpty()) { - ClusterNode node = discoCache.serverNodes().get(0); - - curCrd = new MvccCoordinator(node.id(), - discoCache.version().topologyVersion(), - discoCache.version()); - - log.info("Assigned mvcc coordinator: " + curCrd); - } - else { - curCrd = null; - - log.info("New mvcc coordinator was not assigned [topVer=" + discoCache.version() + ']'); - } - - return curCrd; - } - - /** * @param nodeId Node ID * @param activeQueries */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 03c514f..be7d44a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -68,6 +68,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -1805,6 +1806,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { startGrid(i + 1); checkPutGet(cacheNames); + + checkCoordinatorsConsistency(null); } client = true; @@ -1817,12 +1820,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { node.cache(cacheName); checkPutGet(cacheNames); + + checkCoordinatorsConsistency(null); } for (int i = 0; i < 3; i++) { stopGrid(i); + awaitPartitionMapExchange(); + checkPutGet(cacheNames); + + checkCoordinatorsConsistency(null); } } @@ -1862,6 +1871,56 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testMvccCoordinatorInfoConsistency() throws Exception { + for (int i = 0; i < 4; i++) { + startGrid(i); + + checkCoordinatorsConsistency(i + 1); + } + + client = true; + + startGrid(4); + + checkCoordinatorsConsistency(5); + + startGrid(5); + + checkCoordinatorsConsistency(6); + + client = false; + + stopGrid(0); + + checkCoordinatorsConsistency(5); + } + + /** + * @param expNodes Expected nodes number. + */ + private void checkCoordinatorsConsistency(@Nullable Integer expNodes) { + List<Ignite> nodes = G.allGrids(); + + if (expNodes != null) + assertEquals(expNodes, (Integer)nodes.size()); + + MvccCoordinator crd = null; + + for (Ignite node : G.allGrids()) { + CacheCoordinatorsProcessor crdProc = ((IgniteKernal) node).context().cache().context().coordinators(); + + MvccCoordinator crd0 = crdProc.currentCoordinator(); + + if (crd != null) + assertEquals(crd, crd0); + else + crd = crd0; + } + } + + /** + * @throws Exception If failed. + */ public void testGetVersionRequestFailover() throws Exception { final int NODES = 5;
