Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 [created] 58fc58635


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58fc5863
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58fc5863
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58fc5863

Branch: refs/heads/ignite-3479
Commit: 58fc586357687a5b9022bc2c4f8e49b9e449559f
Parents: 7a4baba
Author: sboikov <[email protected]>
Authored: Mon Sep 25 17:51:49 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Sep 25 17:51:49 2017 +0300

----------------------------------------------------------------------
 .../processors/affinity/AffinityAssignment.java |  2 +
 .../affinity/GridAffinityAssignment.java        | 14 +++-
 .../affinity/GridAffinityAssignmentCache.java   | 18 ++++-
 .../affinity/GridAffinityProcessor.java         |  2 +-
 .../processors/affinity/GridAffinityUtils.java  |  2 +-
 .../affinity/HistoryAffinityAssignment.java     | 11 ++-
 .../processors/cache/ExchangeContext.java       | 19 ++++-
 .../cache/GridCacheAffinityManager.java         |  4 ++
 .../GridCachePartitionExchangeManager.java      |  7 ++
 .../distributed/dht/GridDhtCacheAdapter.java    |  2 +
 .../distributed/dht/GridDhtTxFinishFuture.java  |  2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  2 +-
 .../dht/GridPartitionedGetFuture.java           |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 19 ++++-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 74 +++++++++++++-------
 .../mvcc/CoordinatorAssignmentHistory.java      | 71 -------------------
 .../processors/cache/mvcc/MvccQueryFuture.java  | 27 +++++++
 .../query/GridCacheDistributedQueryManager.java |  2 +-
 .../cache/query/GridCacheQueryManager.java      |  2 +-
 .../cache/transactions/IgniteTxHandler.java     |  2 +
 20 files changed, 169 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index 06207d3..acb9213 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -85,4 +85,6 @@ public interface AffinityAssignment {
      * @return Backup partitions for specified node ID.
      */
     public Set<Integer> backupPartitions(UUID nodeId);
+
+    public ClusterNode mvccCoordinator();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 35130a3..2913930 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -39,6 +39,9 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
     /** Topology version. */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    private final ClusterNode mvccCrd;
+
     /** Collection of calculated affinity nodes. */
     private List<List<ClusterNode>> assignment;
 
@@ -69,6 +72,7 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
         this.topVer = topVer;
         primary = new HashMap<>();
         backup = new HashMap<>();
+        mvccCrd = null;
         clientEvtChange = false;
     }
 
@@ -79,7 +83,8 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
      */
     GridAffinityAssignment(AffinityTopologyVersion topVer,
         List<List<ClusterNode>> assignment,
-        List<List<ClusterNode>> idealAssignment) {
+        List<List<ClusterNode>> idealAssignment,
+        ClusterNode mvccCrd) {
         assert topVer != null;
         assert assignment != null;
         assert idealAssignment != null;
@@ -87,6 +92,7 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
         this.topVer = topVer;
         this.assignment = assignment;
         this.idealAssignment = idealAssignment.equals(assignment) ? assignment 
: idealAssignment;
+        this.mvccCrd = mvccCrd;
 
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -106,6 +112,7 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
         idealAssignment = aff.idealAssignment;
         primary = aff.primary;
         backup = aff.backup;
+        mvccCrd = aff.mvccCrd;
 
         clientEvtChange = true;
     }
@@ -264,6 +271,11 @@ public class GridAffinityAssignment implements 
AffinityAssignment, Serializable
     }
 
     /** {@inheritDoc} */
+    @Override public ClusterNode mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public int hashCode() {
         return topVer.hashCode();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index f921251..4b0659c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -184,10 +184,22 @@ public class GridAffinityAssignmentCache {
      * @param affAssignment Affinity assignment for topology version.
      */
     public void initialize(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> affAssignment) {
+        ClusterNode mvccCrd = 
ctx.cache().context().coordinators().currentCoordinatorForCacheAffinity(topVer);
+
+        initialize(topVer, affAssignment, mvccCrd);
+    }
+
+    /**
+     * Initializes affinity with given topology version and assignment.
+     *
+     * @param topVer Topology version.
+     * @param affAssignment Affinity assignment for topology version.
+     */
+    private void initialize(AffinityTopologyVersion topVer, 
List<List<ClusterNode>> affAssignment, ClusterNode mvccCrd) {
         assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + 
", last=" + lastVersion() + ']';
         assert idealAssignment != null;
 
-        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, 
affAssignment, idealAssignment);
+        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, 
affAssignment, idealAssignment, mvccCrd);
 
         affCache.put(topVer, new HistoryAffinityAssignment(assignment));
         head.set(assignment);
@@ -570,7 +582,9 @@ public class GridAffinityAssignmentCache {
 
         idealAssignment(aff.idealAssignment());
 
-        initialize(aff.lastVersion(), aff.assignments(aff.lastVersion()));
+        AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion());
+
+        initialize(aff.lastVersion(), assign.assignment(), 
assign.mvccCoordinator());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 9c9fb8f..3a142c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -384,7 +384,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             try {
                 GridAffinityAssignment assign = assign0 instanceof 
GridAffinityAssignment ?
                     (GridAffinityAssignment)assign0 :
-                    new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment());
+                    new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment(), assign0.mvccCoordinator());
 
                 AffinityInfo info = new AffinityInfo(
                     cctx.config().getAffinity(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index abd5292..15d7e4e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -184,7 +184,7 @@ class GridAffinityUtils {
 
             GridAffinityAssignment assign = assign0 instanceof 
GridAffinityAssignment ?
                 (GridAffinityAssignment)assign0 :
-                new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment());
+                new GridAffinityAssignment(topVer, assign0.assignment(), 
assign0.idealAssignment(), assign0.mvccCoordinator());
 
             return F.t(
                 affinityMessage(ctx, cctx.config().getAffinity()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index e502dd5..cae3611 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -43,17 +43,26 @@ public class HistoryAffinityAssignment implements 
AffinityAssignment {
     /** */
     private final boolean clientEvtChange;
 
+    /** */
+    private final ClusterNode mvccCrd;
+
     /**
      * @param assign Assignment.
      */
-    public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+    HistoryAffinityAssignment(GridAffinityAssignment assign) {
         this.topVer = assign.topologyVersion();
         this.assignment = assign.assignment();
         this.idealAssignment = assign.idealAssignment();
+        this.mvccCrd = assign.mvccCoordinator();
         this.clientEvtChange = assign.clientEventChange();
     }
 
     /** {@inheritDoc} */
+    @Override public ClusterNode mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean clientEventChange() {
         return clientEvtChange;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 4046c98..c9f0744 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -51,17 +51,22 @@ public class ExchangeContext {
     /** */
     private final boolean compatibilityNode = 
getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
 
+    /** */
+    private final boolean mvccCrdChange;
+
     /**
      * @param crd Coordinator flag.
      * @param fut Exchange future.
      */
-    public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
+    public ExchangeContext(boolean crd, boolean mvccCrdChange, 
GridDhtPartitionsExchangeFuture fut) {
+        this.mvccCrdChange = mvccCrdChange;
+
         int protocolVer = 
exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
 
         if (compatibilityNode || (crd && fut.localJoinExchange())) {
             fetchAffOnJoin = true;
 
-            merge = false;
+            merge = !mvccCrdChange;
         }
         else {
             boolean startCaches = fut.exchangeId().isJoined() &&
@@ -69,7 +74,8 @@ public class ExchangeContext {
 
             fetchAffOnJoin = protocolVer == 1;
 
-            merge = !startCaches &&
+            merge = !mvccCrdChange &&
+                !startCaches &&
                 protocolVer > 1 &&
                 fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
         }
@@ -124,6 +130,13 @@ public class ExchangeContext {
         return merge;
     }
 
+    /**
+     * @return {@code True} if mvcc coordinator node is changed during this 
exchange.
+     */
+    public boolean mvccCoordinatorChange() {
+        return mvccCrdChange;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ExchangeContext.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 702b848..a2407e5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -238,6 +238,10 @@ public class GridCacheAffinityManager extends 
GridCacheManagerAdapter {
         return aff0.cachedAffinity(topVer);
     }
 
+    public ClusterNode mvccCoordinator(AffinityTopologyVersion topVer) {
+        return assignment(topVer).mvccCoordinator();
+    }
+
     /**
      * @param key Key to check.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index fe9ed29..b576789 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1949,6 +1949,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                     ClusterNode node = evt.eventNode();
 
+                    if ((evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT) &&
+                        node.equals(cctx.coordinators().currentCoordinator())) 
{
+                        if (log.isInfoEnabled())
+                            log.info("Stop merge, need exchange for mvcc 
coordinator failure: " + node);
+
+                        break;
+                    }
                     if (!curFut.context().supportsMergeExchanges(node)) {
                         if (log.isInfoEnabled())
                             log.info("Stop merge, node does not support merge: 
" + node);

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index ac04e4b..eaeef53 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1234,6 +1234,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (expVer.equals(curVer))
             return false;
 
+        // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs.
+
         Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
         Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index dd00ad1..31f12b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -295,7 +295,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
         GridLongList waitTxs = tx.mvccWaitTransactions();
 
         if (waitTxs != null) {
-            ClusterNode crd = 
cctx.coordinators().coordinator(tx.topologyVersion());
+            ClusterNode crd = cctx.coordinators().currentCoordinator();
 
             assert crd != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 0fe17a8..0431224 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1234,7 +1234,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
             if (req.requestMvccCounter()) {
                 assert tx.txState().mvccEnabled(cctx);
 
-                ClusterNode crd = 
cctx.coordinators().coordinator(tx.topologyVersion());
+                ClusterNode crd = cctx.coordinators().currentCoordinator();
 
                 assert crd != null : tx.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 9b7d733..11d81a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -154,7 +154,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
 
         // TODO IGNITE-3478 (correct failover and remap).
         if (cctx.mvccEnabled()) {
-            mvccCrd = cctx.shared().coordinators().coordinator(topVer);
+            mvccCrd = cctx.affinity().mvccCoordinator(topVer);
 
             if (mvccCrd == null) {
                 onDone(new ClusterTopologyCheckedException("Mvcc coordinator 
is not assigned: " + topVer));

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 03b7b6e..47e1f17 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -552,7 +553,21 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
-            exchCtx = new ExchangeContext(crdNode, this);
+            boolean mvccCrdChange = false;
+
+            if (localJoinExchange())
+                cctx.coordinators().reassignCoordinator(firstEvtDiscoCache);
+            else if (exchId.isLeft()){
+                ClusterNode mvccCrd = cctx.coordinators().currentCoordinator();
+
+                if (mvccCrd != null && mvccCrd.equals(exchId.eventNode())) {
+                    ClusterNode newMvccCrd = 
cctx.coordinators().reassignCoordinator(firstEvtDiscoCache);
+
+                    mvccCrdChange = !Objects.equals(mvccCrd, newMvccCrd);
+                }
+            }
+
+            exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this);
 
             assert state == null : state;
 
@@ -1418,8 +1433,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
 
         if (err == null) {
-            
cctx.coordinators().assignCoordinator(exchCtx.events().discoveryCache());
-
             if (centralizedAff) {
                 assert !exchCtx.mergeExchanges();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index c46a624..ea44df1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
@@ -72,7 +71,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
     private static final byte MSG_POLICY = SYSTEM_POOL;
     
     /** */
-    private final CoordinatorAssignmentHistory assignHist = new 
CoordinatorAssignmentHistory();
+    private volatile Coordinator curCrd;
 
     /** */
     private final AtomicLong mvccCntr = new AtomicLong(1L);
@@ -144,7 +143,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
      * @return Counter.
      */
     public MvccCoordinatorVersion 
requestTxCounterOnCoordinator(IgniteInternalTx tx) {
-        assert cctx.localNode().equals(assignHist.currentCoordinator());
+        assert cctx.localNode().equals(currentCoordinator());
 
         return assignTxCounter(tx.nearXidVersion(), 0L);
     }
@@ -682,43 +681,48 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         }
     }
 
-    /**
-     * @param topVer Topology version.
-     * @return MVCC coordinator for given topology version.
-     */
-    @Nullable public ClusterNode coordinator(AffinityTopologyVersion topVer) {
-        return assignHist.coordinator(topVer);
+    public ClusterNode currentCoordinator() {
+        Coordinator crd = curCrd;
+
+        return crd != null ? crd.crd : null;
+    }
+
+    public ClusterNode 
currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) {
+        Coordinator crd = curCrd;
+
+        assert crd == null || crd.topVer.compareTo(topVer) <= 0 : crd;
+
+        return crd != null ? crd.crd : null;
     }
 
     /**
      * @param discoCache Discovery snapshot.
      */
-    public void assignCoordinator(DiscoCache discoCache) {
-        ClusterNode curCrd = assignHist.currentCoordinator();
-
-        if (curCrd == null || !discoCache.allNodes().contains(curCrd)) {
-            ClusterNode newCrd = null;
-
-            if (!discoCache.serverNodes().isEmpty())
-                newCrd = discoCache.serverNodes().get(0);
+    public ClusterNode reassignCoordinator(DiscoCache discoCache) {
+        ClusterNode curCrd = currentCoordinator();
 
-            if (!F.eq(curCrd, newCrd)) {
-                assignHist.addAssignment(discoCache.version(), newCrd);
+        assert curCrd == null || !discoCache.allNodes().contains(curCrd) : 
curCrd;
 
-                if (cctx.localNode().equals(newCrd)) {
-                    crdVer = discoCache.version().topologyVersion();
+        ClusterNode newCrd;
 
-                    crdLatch.countDown();
-                }
+        if (!discoCache.serverNodes().isEmpty()) {
+            newCrd = discoCache.serverNodes().get(0);
 
-                log.info("Assigned mvcc coordinator [topVer=" + 
discoCache.version() +
-                    ", crd=" + newCrd + ']');
+            if (cctx.localNode().equals(newCrd)) {
+                crdVer = discoCache.version().topologyVersion();
 
-                return;
+                crdLatch.countDown();
             }
+
+            log.info("Assigned mvcc coordinator [topVer=" + 
discoCache.version() +
+                ", crd=" + newCrd + ']');
         }
+        else
+            newCrd = null;
+
+        this.curCrd = new Coordinator(newCrd, discoCache.version());
 
-        assignHist.addAssignment(discoCache.version(), curCrd);
+        return newCrd;
     }
 
     /**
@@ -996,4 +1000,20 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
             this.txId = txId;
         }
     }
+
+    /**
+     *
+     */
+    static class Coordinator {
+        /** */
+        final ClusterNode crd;
+
+        /** */
+        final AffinityTopologyVersion topVer;
+
+        Coordinator(ClusterNode crd, AffinityTopologyVersion topVer) {
+            this.crd = crd;
+            this.topVer = topVer;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
deleted file mode 100644
index 40354a8..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.lang.IgniteBiTuple;
-
-/**
- *
- */
-class CoordinatorAssignmentHistory {
-    /** */
-    private volatile Map<AffinityTopologyVersion, ClusterNode> assignHist = 
Collections.emptyMap();
-
-    /** */
-    private volatile IgniteBiTuple<AffinityTopologyVersion, ClusterNode>
-        cur = new IgniteBiTuple<>(AffinityTopologyVersion.NONE, null);
-
-    void addAssignment(AffinityTopologyVersion topVer, ClusterNode crd) {
-        assert !assignHist.containsKey(topVer);
-        assert topVer.compareTo(cur.get1()) > 0;
-
-        cur = new IgniteBiTuple<>(topVer, crd);
-
-        Map<AffinityTopologyVersion, ClusterNode> hist = new 
HashMap<>(assignHist);
-
-        hist.put(topVer, crd);
-
-        assignHist = hist;
-
-    }
-
-    ClusterNode currentCoordinator() {
-        return cur.get2();
-    }
-
-    ClusterNode coordinator(AffinityTopologyVersion topVer) {
-        assert topVer.initialized() : topVer;
-
-        IgniteBiTuple<AffinityTopologyVersion, ClusterNode> cur0 = cur;
-
-        if (cur0.get1().equals(topVer))
-            return cur0.get2();
-
-        Map<AffinityTopologyVersion, ClusterNode> assignHist0 = assignHist;
-
-        assert assignHist.containsKey(topVer) :
-            "No coordinator assignment [topVer=" + topVer + ", curVer=" + 
cur0.get1() + ", hist=" + assignHist0.keySet() + ']';
-
-        return assignHist0.get(topVer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
new file mode 100644
index 0000000..62160e1
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ *
+ */
+public interface MvccQueryFuture {
+    void coordinatorChanged(ClusterNode node);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 3433b4f..3c43768 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -539,7 +539,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
             final MvccCoordinatorVersion mvccVer;
 
             if (cctx.mvccEnabled()) {
-                mvccCrd = 
cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion());
+                mvccCrd = 
cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
                 IgniteInternalFuture<MvccCoordinatorVersion> fut0 = 
cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index b711a80..f46f8d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1465,7 +1465,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
             // TODO IGNITE-3478.
             if (cctx.mvccEnabled()) {
-                mvccCrd = 
cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion());
+                mvccCrd = 
cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
                 IgniteInternalFuture<MvccCoordinatorVersion> fut0 = 
cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58fc5863/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ef42a14..ff7de90 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -605,6 +605,8 @@ public class IgniteTxHandler {
         if (expVer.equals(curVer))
             return false;
 
+        // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs.
+
         for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
             GridCacheContext ctx = e.context();
 

Reply via email to