Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 e0196b003 -> c964314a8


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c964314a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c964314a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c964314a

Branch: refs/heads/ignite-3479
Commit: c964314a8a145af5f9f9f5d0f186f7b569c5f346
Parents: e0196b0
Author: sboikov <[email protected]>
Authored: Thu Sep 28 11:21:00 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Sep 28 11:25:04 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 13 ++++
 .../discovery/GridDiscoveryManager.java         |  4 +
 .../GridDhtPartitionsExchangeFuture.java        | 21 ++----
 .../GridNearPessimisticTxPrepareFuture.java     |  2 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 78 +++++++++++++-------
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 59 +++++++++++++++
 6 files changed, 134 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 95e855a..b6cae3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -81,6 +82,9 @@ public class DiscoCache {
     /** */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    private final MvccCoordinator mvccCrd;
+
     /**
      * @param topVer Topology version.
      * @param state Current cluster state.
@@ -99,6 +103,7 @@ public class DiscoCache {
         AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
+        MvccCoordinator mvccCrd,
         List<ClusterNode> rmtNodes,
         List<ClusterNode> allNodes,
         List<ClusterNode> srvNodes,
@@ -111,6 +116,7 @@ public class DiscoCache {
         this.topVer = topVer;
         this.state = state;
         this.loc = loc;
+        this.mvccCrd = mvccCrd;
         this.rmtNodes = rmtNodes;
         this.allNodes = allNodes;
         this.srvNodes = srvNodes;
@@ -136,6 +142,13 @@ public class DiscoCache {
     }
 
     /**
+     * @return Mvcc coordinator node.
+     */
+    @Nullable public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /**
      * @return Topology version.
      */
     public AffinityTopologyVersion version() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 527399d..584df82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
@@ -616,6 +617,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                 DiscoCache discoCache = null;
 
+                ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer);
+
                 boolean locJoinEvt = type == EVT_NODE_JOINED && 
node.id().equals(locNode.id());
 
                 IgniteInternalFuture<Boolean> transitionWaitFut = null;
@@ -2261,6 +2264,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             topVer,
             state,
             loc,
+            ctx.coordinators().discoveryData().coordinator(),
             Collections.unmodifiableList(rmtNodes),
             Collections.unmodifiableList(allNodes),
             Collections.unmodifiableList(srvNodes),

http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d93b359..01ec408 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -557,24 +557,14 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
-            boolean newMvccCrd = false;
+            MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator();
 
-            if (localJoinExchange()) {
-                MvccCoordinator mvccCrd = 
cctx.coordinators().currentCoordinator();
+            boolean mvccCrdChange = mvccCrd != null &&
+                initialVersion().equals(mvccCrd.topologyVersion());
 
-                if (mvccCrd == null) {
-                    newMvccCrd = 
cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null &&
-                        srvNodes.size() == 1;
-                }
-            }
-            else if (exchId.isLeft()){
-                MvccCoordinator mvccCrd = 
cctx.coordinators().currentCoordinator();
-
-                if (mvccCrd != null && 
mvccCrd.nodeId().equals(exchId.eventNode().id()))
-                    newMvccCrd = 
cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null;
-            }
+            cctx.kernalContext().coordinators().currentCoordinator(mvccCrd);
 
-            exchCtx = new ExchangeContext(crdNode, newMvccCrd, this);
+            exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this);
 
             assert state == null : state;
 
@@ -585,6 +575,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             if (exchLog.isInfoEnabled()) {
                 exchLog.info("Started exchange init [topVer=" + topVer +
+                    ", mvccCrd=" + mvccCrd +
                     ", crd=" + crdNode +
                     ", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) 
+
                     ", evtNode=" + firstDiscoEvt.eventNode().id() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 0664b1a..dbfea8b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -431,7 +431,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
         if (mvccCrd != null) {
             assert !tx.onePhaseCommit();
 
-            if (mvccCrd.equals(cctx.localNodeId())) {
+            if (mvccCrd.nodeId().equals(cctx.localNodeId())) {
                 MvccCoordinatorVersion mvccVer = 
cctx.coordinators().requestTxCounterOnCoordinator(tx);
 
                 onMvccResponse(cctx.localNodeId(), mvccVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 5f5da20..e2d2183 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -134,7 +136,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     }
 
     /** */
-    private CacheCoordinatorsDiscoveryData discoData;
+    private CacheCoordinatorsDiscoveryData discoData = new 
CacheCoordinatorsDiscoveryData(null);
 
     /**
      * @param ctx Context.
@@ -150,7 +152,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        Integer cmpId = DiscoveryDataExchangeType.CACHE_CRD_PROC.ordinal();
+        Integer cmpId = discoveryDataType().ordinal();
 
         if (!dataBag.commonDataCollectedFor(cmpId))
             dataBag.addGridCommonData(cmpId, discoData);
@@ -159,6 +161,43 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override public void 
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
         discoData = (CacheCoordinatorsDiscoveryData)data.commonData();
+
+        assert discoData != null;
+    }
+
+    /**
+     * @return Discovery data.
+     */
+    public CacheCoordinatorsDiscoveryData discoveryData() {
+        return discoData;
+    }
+
+    public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, 
long topVer) {
+        MvccCoordinator crd = discoData.coordinator();
+
+        if (crd == null ||
+            ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && 
!F.nodeIds(nodes).contains(crd.nodeId()))) {
+            ClusterNode crdNode = null;
+
+            // Expect nodes are sorted by order.
+            for (ClusterNode node : nodes) {
+                if (!CU.clientNode(node)) {
+                    crdNode = node;
+
+                    break;
+                }
+            }
+
+            crd = crdNode != null ? new
+                MvccCoordinator(crdNode.id(), topVer, new 
AffinityTopologyVersion(topVer, 0)) : null;
+
+            if (crd != null)
+                log.info("Assigned mvcc coordinator: " + crd);
+            else
+                U.warn(log, "New mvcc coordinator was not assigned [topVer=" + 
topVer + ']');
+
+            discoData = new CacheCoordinatorsDiscoveryData(crd);
+        }
     }
 
     /** {@inheritDoc} */
@@ -760,10 +799,20 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
         }
     }
 
+    /**
+     * @return
+     */
     public MvccCoordinator currentCoordinator() {
         return curCrd;
     }
 
+    public void currentCoordinator(MvccCoordinator curCrd) {
+        this.curCrd = curCrd;
+    }
+
+    /**
+     * @return
+     */
     public UUID currentCoordinatorId() {
         MvccCoordinator curCrd = this.curCrd;
 
@@ -785,31 +834,6 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param discoCache Discovery snapshot.
-     * @return New coordinator.
-     */
-    public MvccCoordinator reassignCoordinator(DiscoCache discoCache) {
-        assert curCrd == null || 
!F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd;
-
-        if (!discoCache.serverNodes().isEmpty()) {
-            ClusterNode node = discoCache.serverNodes().get(0);
-
-            curCrd = new MvccCoordinator(node.id(),
-                discoCache.version().topologyVersion(),
-                discoCache.version());
-
-            log.info("Assigned mvcc coordinator: " + curCrd);
-        }
-        else {
-            curCrd = null;
-
-            log.info("New mvcc coordinator was not assigned [topVer=" + 
discoCache.version() + ']');
-        }
-
-        return curCrd;
-    }
-
-    /**
      * @param nodeId Node ID
      * @param activeQueries
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c964314a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 03c514f..be7d44a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -68,6 +68,7 @@ import 
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -1805,6 +1806,8 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             startGrid(i + 1);
 
             checkPutGet(cacheNames);
+
+            checkCoordinatorsConsistency(null);
         }
 
         client = true;
@@ -1817,12 +1820,18 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 node.cache(cacheName);
 
             checkPutGet(cacheNames);
+
+            checkCoordinatorsConsistency(null);
         }
 
         for (int i = 0; i < 3; i++) {
             stopGrid(i);
 
+            awaitPartitionMapExchange();
+
             checkPutGet(cacheNames);
+
+            checkCoordinatorsConsistency(null);
         }
     }
 
@@ -1862,6 +1871,56 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testMvccCoordinatorInfoConsistency() throws Exception {
+        for (int i = 0; i < 4; i++) {
+            startGrid(i);
+
+            checkCoordinatorsConsistency(i + 1);
+        }
+
+        client = true;
+
+        startGrid(4);
+
+        checkCoordinatorsConsistency(5);
+
+        startGrid(5);
+
+        checkCoordinatorsConsistency(6);
+
+        client = false;
+
+        stopGrid(0);
+
+        checkCoordinatorsConsistency(5);
+    }
+
+    /**
+     * @param expNodes Expected nodes number.
+     */
+    private void checkCoordinatorsConsistency(@Nullable Integer expNodes) {
+        List<Ignite> nodes = G.allGrids();
+
+        if (expNodes != null)
+            assertEquals(expNodes, (Integer)nodes.size());
+
+        MvccCoordinator crd = null;
+
+        for (Ignite node : G.allGrids()) {
+            CacheCoordinatorsProcessor crdProc = ((IgniteKernal) 
node).context().cache().context().coordinators();
+
+            MvccCoordinator crd0 = crdProc.currentCoordinator();
+
+            if (crd != null)
+                assertEquals(crd, crd0);
+            else
+                crd = crd0;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testGetVersionRequestFailover() throws Exception {
         final int NODES = 5;
 

Reply via email to