pending

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a8b66f3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a8b66f3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a8b66f3

Branch: refs/heads/ignite-10537
Commit: 3a8b66f31a0e7efa66c522fab25d55097c61a87b
Parents: 7301ada
Author: Igor Seliverstov <[email protected]>
Authored: Sun Dec 9 22:30:45 2018 +0300
Committer: Igor Seliverstov <[email protected]>
Committed: Sun Dec 9 22:30:45 2018 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   4 +-
 .../processors/cache/GridCacheAdapter.java      |   6 +
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../processors/cache/mvcc/MvccCoordinator.java  |  54 ++--
 .../cache/mvcc/MvccDiscoveryData.java           |  52 ----
 .../processors/cache/mvcc/MvccProcessor.java    |  24 +-
 .../cache/mvcc/MvccProcessorImpl.java           | 282 +++++++++----------
 .../cache/mvcc/MvccQueryTrackerImpl.java        |   6 +-
 .../processors/cache/mvcc/MvccUtils.java        |   2 +-
 .../ignite/internal/util/GridLongList.java      |   1 +
 10 files changed, 191 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 5abe63c..04a4c5e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -785,7 +785,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     discoWrk.discoCache = discoCache;
 
                     if (!isLocDaemon && !ctx.clientDisconnected()) {
-                        
ctx.cache().context().coordinators().onLocalJoin(discoEvt);
+                        
ctx.cache().context().coordinators().onLocalJoin(discoEvt, discoCache);
 
                         ctx.cache().context().exchange().onLocalJoin(discoEvt, 
discoCache);
 
@@ -845,6 +845,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                     ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
 
+                    
ctx.cache().context().coordinators().onLocalJoin(localJoinEvent(), discoCache);
+
                     
ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache);
 
                     ctx.cluster().clientReconnectFuture().listen(new 
CI1<IgniteFuture<?>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1cd94d2..4ac9774 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4316,6 +4316,12 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                             AffinityTopologyVersion awaitVer = new 
AffinityTopologyVersion(
                                 topVer.topologyVersion() + 1, 0);
 
+                            U.warn(log, X.getFullStackTrace(e));
+
+                            U.warn(log,"txTopVer=" + topVer.topologyVersion() 
+ ", waitVer=" + awaitVer);
+
+                            U.warn(log, "curCrd=" + 
ctx.shared().coordinators().currentCoordinator());
+
                             
ctx.shared().exchange().affinityReadyFuture(awaitVer).get();
 
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ffc55a9..7849f1e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2070,7 +2070,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             tryToPerformLocalSnapshotOperation();
 
         if (err == null)
-            
cctx.coordinators().onExchangeDone(exchCtx.events().discoveryCache());
+            cctx.coordinators().onExchangeDone(initialVersion(), res, 
exchCtx.events().discoveryCache());
 
         // Create and destory caches and cache proxies.
         cctx.cache().onExchangeDone(initialVersion(), exchActions, err);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
index 045177a..a9dd18e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
@@ -31,38 +31,46 @@ public class MvccCoordinator implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    @GridToStringInclude
+    private final AffinityTopologyVersion topVer;
+
+    /** */
     private final UUID nodeId;
 
     /**
      * Unique coordinator version, increases when new coordinator is assigned,
      * can differ from topVer if we decide to assign coordinator manually.
      */
-    private final long crdVer;
+    private final long ver;
 
     /** */
-    @GridToStringInclude
-    private final AffinityTopologyVersion topVer;
+    private final boolean local;
 
-    /**
-     * @param nodeId Coordinator node ID.
-     * @param crdVer Coordinator version.
+    /**  
      * @param topVer Topology version when coordinator was assigned.
      */
-    public MvccCoordinator(UUID nodeId, long crdVer, AffinityTopologyVersion 
topVer) {
-        assert nodeId != null;
-        assert crdVer > 0 : crdVer;
-        assert topVer != null;
+    public MvccCoordinator(AffinityTopologyVersion topVer) {
+        this(topVer, null, 0, false);
+    }
 
+    /**
+     * @param topVer Topology version when coordinator was assigned.
+     * @param nodeId Coordinator node ID.
+     * @param ver Coordinator version.
+     */
+    public MvccCoordinator(AffinityTopologyVersion topVer, UUID nodeId, long 
ver,
+        boolean local) {
         this.nodeId = nodeId;
-        this.crdVer = crdVer;
+        this.ver = ver;
         this.topVer = topVer;
+        this.local = local;
     }
 
     /**
-     * @return Unique coordinator version.
+     * @return Topology version when coordinator was assigned.
      */
-    public long coordinatorVersion() {
-        return crdVer;
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
     }
 
     /**
@@ -73,10 +81,18 @@ public class MvccCoordinator implements Serializable {
     }
 
     /**
-     * @return Topology version when coordinator was assigned.
+     * @return Unique coordinator version.
      */
-    public AffinityTopologyVersion topologyVersion() {
-        return topVer;
+    public long version() {
+        return ver;
+    }
+
+    /**
+     *
+     * @return {@code True} if the coordinator is local.
+     */
+    public boolean local() {
+        return local;
     }
 
     /** {@inheritDoc} */
@@ -89,12 +105,12 @@ public class MvccCoordinator implements Serializable {
 
         MvccCoordinator that = (MvccCoordinator)o;
 
-        return crdVer == that.crdVer;
+        return ver == that.ver;
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        return (int)(crdVer ^ (crdVer >>> 32));
+        return (int)(ver ^ (ver >>> 32));
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java
deleted file mode 100644
index d2e936f..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.io.Serializable;
-
-/**
- * MVCC discovery data to be shared between nodes on join.
- */
-public class MvccDiscoveryData implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Current coordinator. */
-    private MvccCoordinator crd;
-
-    /**
-     * @param crd Coordinator.
-     */
-    public MvccDiscoveryData(MvccCoordinator crd) {
-        this.crd = crd;
-    }
-
-    /**
-     * @return Current coordinator.
-     */
-    public MvccCoordinator coordinator() {
-        return crd;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(MvccDiscoveryData.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index 161342f..5a648b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -27,9 +26,10 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.GridProcessor;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.GridLongList;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,30 +41,22 @@ public interface MvccProcessor extends GridProcessor {
      *
      * @param evt Discovery event.
      */
-    void onLocalJoin(DiscoveryEvent evt);
+    void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache);
 
     /**
      * Exchange done callback.
      *
+     * @param initialVersion Initial exchange version.
+     * @param resultVersion Result exchange version, defers from initial if 
several exchange were merged.
      * @param discoCache Disco cache.
      */
-    void onExchangeDone(DiscoCache discoCache);
-
-    /**
-     * @param nodeId Node ID
-     * @param activeQueries Active queries.
-     */
-    void processClientActiveQueries(UUID nodeId, @Nullable GridLongList 
activeQueries);
+    void onExchangeDone(AffinityTopologyVersion initialVersion,
+        AffinityTopologyVersion resultVersion, DiscoCache discoCache);
 
     /**
      * @return Coordinator.
      */
-    @Nullable MvccCoordinator currentCoordinator();
-
-    /**
-     * @return Current coordinator node ID.
-     */
-    UUID currentCoordinatorId();
+    @NotNull MvccCoordinator currentCoordinator();
 
     /**
      * @param crdVer Mvcc coordinator version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index d026607..b29c782 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,7 +41,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -51,7 +51,6 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -106,6 +105,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -156,11 +156,8 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
         MvccProcessorImpl.crdC = crdC;
     }
 
-    /** Topology version when local node was assigned as coordinator. */
-    private volatile long crdVer;
-
     /** */
-    private volatile MvccCoordinator curCrd;
+    private volatile MvccCoordinator curCrd = new 
MvccCoordinator(AffinityTopologyVersion.NONE);
 
     /** */
     private TxLog txLog;
@@ -218,9 +215,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProce
     /** Flag whether all nodes in cluster support MVCC. */
     private volatile boolean mvccSupported = true;
 
-    /** Flag whether coordinator was changed by the last discovery event. */
-    private volatile boolean crdChanged;
-
     /**
      * Maps failed node id to votes accumulator for that node.
      */
@@ -237,12 +231,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        ctx.event().addLocalEventListener(new GridLocalEventListener() {
-                @Override public void onEvent(Event evt) {
-                    onDiscovery((DiscoveryEvent)evt);
-                }
-            },
-            EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED);
+        ctx.event().addDiscoveryEventListener(this::onDiscovery, 
EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_CLIENT_NODE_DISCONNECTED, EVT_NODE_JOINED);
 
         ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new 
CoordinatorMessageListener());
 
@@ -370,81 +359,78 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(DiscoCache discoCache) {
+    @Override public void onExchangeDone(AffinityTopologyVersion 
initialVersion,
+        AffinityTopologyVersion resultVersion, DiscoCache discoCache) {
         MvccCoordinator curCrd0 = curCrd;
 
-        if (crdChanged) {
+        if (coordinatorChanged(curCrd0, initialVersion, resultVersion)) {
             // Rollback all transactions with old snapshots.
             ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange();
 
             // Complete init future if local node is a new coordinator. All 
previous txs are already completed here.
-            if (crdVer != 0 && !initFut.isDone()) {
-                assert curCrd0 != null && 
curCrd0.nodeId().equals(ctx.localNodeId());
-
+            if (curCrd0.local())
                 initFut.onDone();
-            }
-
-            crdChanged = false;
         }
         else {
-            if (curCrd0 != null && ctx.localNodeId().equals(curCrd0.nodeId()) 
&& discoCache != null)
+            if (curCrd0.local() && discoCache != null)
                 cleanupOrphanedServerTransactions(discoCache.serverNodes());
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void onLocalJoin(DiscoveryEvent evt) {
-        assert evt.type() == EVT_NODE_JOINED && 
ctx.localNodeId().equals(evt.eventNode().id());
+    @Override public void onLocalJoin(DiscoveryEvent evt, DiscoCache 
discoCache) {
+        assert evt.type() == EVT_NODE_JOINED && evt.eventNode().isLocal();
+
+        AffinityTopologyVersion topVer = discoCache.version();
+        List<ClusterNode> nodes = discoCache.allNodes();
 
-        onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), 
false);
+        checkMvccSupported(nodes);
+
+        onCoordinatorChanged(topVer, nodes, false);
     }
 
     /**
-     * Discovery listener. Note: initial join event is handled by {@link 
MvccProcessorImpl#onLocalJoin(DiscoveryEvent)}
+     * Discovery listener. Note: initial join event is handled by {@link 
MvccProcessorImpl#onLocalJoin}
      * method.
      *
      * @param evt Discovery event.
      */
-    private void onDiscovery(DiscoveryEvent evt) {
-        assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT || 
evt.type() == EVT_NODE_JOINED;
+    private void onDiscovery(DiscoveryEvent evt, DiscoCache discoCache) {
+        assert evt.type() == EVT_NODE_FAILED
+            || evt.type() == EVT_NODE_LEFT
+            || evt.type() == EVT_NODE_JOINED
+            || evt.type() == EVT_CLIENT_NODE_DISCONNECTED;
 
         UUID nodeId = evt.eventNode().id();
+        AffinityTopologyVersion topVer = discoCache.version();
+        List<ClusterNode> nodes = discoCache.allNodes();
+
+        checkMvccSupported(nodes);
 
         MvccCoordinator curCrd0 = curCrd;
 
         if (evt.type() == EVT_NODE_JOINED) {
-            if (curCrd0 == null) // Handle join event only if coordinator has 
not been elected yet.
-                onCoordinatorChanged(evt.topologyNodes(), 
evt.topologyVersion(), false);
-
-            return;
+            if (curCrd0.nodeId() == null) // Handle join event only if 
coordinator has not been elected yet.
+                onCoordinatorChanged(topVer, nodes, false);
         }
+        else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+            if (curCrd0.nodeId() != null) {
+                // 1. Notify all listeners waiting for a snapshot.
+                onCoordinatorFailed(curCrd0.nodeId());
 
-        // Process mvcc coordinator left event on the rest nodes.
-        if (nodeId.equals(curCrd0.nodeId())) {
-            // 1. Notify all listeners waiting for a snapshot.
-            Map<Long, MvccSnapshotResponseListener> map = 
snapLsnrs.remove(nodeId);
-
-            if (map != null) {
-                ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException("Failed to request mvcc " +
-                    "version, coordinator failed: " + nodeId);
-
-                MvccSnapshotResponseListener lsnr;
-
-                for (Long id : map.keySet()) {
-                    if ((lsnr = map.remove(id)) != null)
-                        lsnr.onError(ex);
-                }
+                // 2. Process coordinator change.
+                onCoordinatorChanged(topVer, Collections.emptyList(), false);
             }
+        }
+        else if (nodeId.equals(curCrd0.nodeId())) {
+            // 1. Notify all listeners waiting for a snapshot.
+            onCoordinatorFailed(nodeId);
 
-            // 2. Notify acknowledge futures.
-            for (WaitAckFuture fut : ackFuts.values())
-                fut.onNodeLeft(nodeId);
-
-            // 3. Process coordinator change.
-            onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), 
true);
+            // 2. Process coordinator change.
+            onCoordinatorChanged(topVer, discoCache.allNodes(), true);
         }
         // Process node left event on the current mvcc coordinator.
-        else if (curCrd0.nodeId().equals(ctx.localNodeId())) {
+        else if (curCrd0.local()) {
             // 1. Notify active queries.
             activeQueries.onNodeFailed(nodeId);
 
@@ -471,65 +457,83 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
         }
     }
 
+    /** */
+    private void onCoordinatorFailed(UUID nodeId) {
+        // 1. Notify all listeners waiting for a snapshot.
+        Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.remove(nodeId);
+
+        if (map != null) {
+            ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException("Failed to request mvcc " +
+                "version, coordinator left: " + nodeId);
+
+            MvccSnapshotResponseListener lsnr;
+
+            for (Long id : map.keySet()) {
+                if ((lsnr = map.remove(id)) != null)
+                    lsnr.onError(ex);
+            }
+        }
+
+        // 2. Notify acknowledge futures.
+        for (WaitAckFuture fut : ackFuts.values())
+            fut.onNodeLeft(nodeId);
+    }
+
     /**
      * Coordinator change callback. Performs all needed actions for handling 
new coordinator assignment.
      *
-     * @param nodes Cluster topology snapshot.
-     * @param topVer Topology version.
      * @param sndQrys {@code True} if it is need to send an active queries 
list to the new coordinator.
      */
-    private void onCoordinatorChanged(Collection<ClusterNode> nodes, long 
topVer, boolean sndQrys) {
-        MvccCoordinator newCrd = pickMvccCoordinator(nodes, topVer);
+    private void onCoordinatorChanged(AffinityTopologyVersion topVer, 
Collection<ClusterNode> nodes, boolean sndQrys) {
+        MvccCoordinator crd0 = pickMvccCoordinator(nodes, topVer);
 
-        if (newCrd == null)
-            return;
+        if (sndQrys && crd0.nodeId() == null)
+            sndQrys = false; // Nowhere to send
 
-        // Update current coordinator, collect active queries and send it to 
the new coordinator if needed.
-        GridLongList activeQryTrackers = null;
+        assert crd0.topologyVersion().compareTo(curCrd.topologyVersion()) >= 0;
 
-        synchronized (activeTrackers) {
-            assert  curCrd == null || 
newCrd.topologyVersion().compareTo(curCrd.topologyVersion()) > 0;
+        curCrd = crd0;
 
-            if (sndQrys) {
-                activeQryTrackers = new GridLongList();
+        GridLongList qrys = null;
 
-                for (MvccQueryTracker tracker : activeTrackers.values()) {
-                    long trackerId = tracker.onMvccCoordinatorChange(newCrd);
+        if (sndQrys) {
+            qrys = new GridLongList();
 
-                    if (trackerId != MVCC_TRACKER_ID_NA)
-                        activeQryTrackers.add(trackerId);
-                }
-            }
+            for (MvccQueryTracker tracker : activeTrackers.values()) {
+                long trackerId = tracker.onMvccCoordinatorChange(crd0);
 
-            curCrd = newCrd;
+                if (trackerId != MVCC_TRACKER_ID_NA)
+                    qrys.add(trackerId);
+            }
         }
 
-        // Send local active queries to remote coordinator, if needed.
-        if (!newCrd.nodeId().equals(ctx.localNodeId())) {
+        if (crd0.local())
+            prevCrdQueries.init(qrys, F.view(nodes, this::supportsMvcc), 
ctx.discovery());
+            // Send local active queries to remote coordinator, if needed.
+        else if (qrys != null && !qrys.isEmpty()) {
             try {
-                if (sndQrys)
-                    sendMessage(newCrd.nodeId(), new 
MvccActiveQueriesMessage(activeQryTrackers));
+                sendMessage(crd0.nodeId(), new MvccActiveQueriesMessage(qrys));
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to send active queries to mvcc 
coordinator: " + e);
             }
         }
-        // If a current node was elected as a new mvcc coordinator, we need to 
pre-initialize it.
-        else {
-            assert crdVer == 0 : crdVer;
-
-            crdVer = newCrd.coordinatorVersion();
+    }
 
-            if (log.isInfoEnabled())
-                log.info("Initialize local node as mvcc coordinator [node=" + 
ctx.localNodeId() +
-                    ", crdVer=" + crdVer + ']');
+    /**
+     * @param currCrd Current Mvcc coordinator.
+     * @param from Start topology version.
+     * @param to End topology version
+     * @return {@code True} if coordinator was changed between two passed 
topology versions.
+     */
+    private boolean coordinatorChanged(MvccCoordinator currCrd, 
AffinityTopologyVersion from,
+        AffinityTopologyVersion to) {
+        if (currCrd == null)
+            return false;
 
-            prevCrdQueries.init(activeQryTrackers, F.view(nodes, 
this::supportsMvcc), ctx.discovery());
+        AffinityTopologyVersion crdVersion = currCrd.topologyVersion();
 
-            // Do not complete init future here, because we should wait until 
all old transactions become terminated.
-        }
-
-        crdChanged = true;
+        return from.compareTo(crdVersion) <= 0 && to .compareTo(crdVersion) >= 
0;
     }
 
     /**
@@ -561,23 +565,11 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
     }
 
     /** {@inheritDoc} */
-    @Override public void processClientActiveQueries(UUID nodeId, @Nullable 
GridLongList activeQueries) {
-        prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public MvccCoordinator currentCoordinator() {
+    @Override @NotNull public MvccCoordinator currentCoordinator() {
         return curCrd;
     }
 
     /** {@inheritDoc} */
-    @Override public UUID currentCoordinatorId() {
-        MvccCoordinator curCrd = this.curCrd;
-
-        return curCrd != null ? curCrd.nodeId() : null;
-    }
-
-    /** {@inheritDoc} */
     @Override public byte state(MvccVersion ver) throws IgniteCheckedException 
{
         return state(ver.coordinatorVersion(), ver.counter());
     }
@@ -661,7 +653,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProce
     @Override public MvccSnapshot tryRequestSnapshotLocal(@Nullable 
IgniteInternalTx tx) throws ClusterTopologyCheckedException {
         MvccCoordinator crd = currentCoordinator();
 
-        if (crd == null)
+        if (crd.nodeId() == null)
             throw noCoordinatorError();
 
         if (tx != null) {
@@ -672,7 +664,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProce
                     "for the locked topology version. [crd=" + crd + ", tx=" + 
tx + ']');
         }
 
-        if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone())
+        if (!crd.local() || !initFut.isDone())
             return null;
         else if (tx != null)
             return assignTxSnapshot(0L, ctx.localNodeId(), false);
@@ -698,7 +690,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProce
     @Override public void requestSnapshotAsync(IgniteInternalTx tx, 
MvccSnapshotResponseListener lsnr) {
         MvccCoordinator crd = currentCoordinator();
 
-        if (crd == null) {
+        if (crd.nodeId() == null) {
             lsnr.onError(noCoordinatorError());
 
             return;
@@ -759,10 +751,10 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
 
         MvccCoordinator crd = curCrd;
 
-        if (updateVer.coordinatorVersion() == crd.coordinatorVersion())
-            return sendTxCommit(crd, new 
MvccAckRequestTx(futIdCntr.incrementAndGet(), updateVer.counter()));
+        if (crd.version() != updateVer.coordinatorVersion())
+            return new GridFinishedFuture<>();
 
-        return new GridFinishedFuture<>();
+        return sendTxCommit(crd, new 
MvccAckRequestTx(futIdCntr.incrementAndGet(), updateVer.counter()));
     }
 
     /** {@inheritDoc} */
@@ -771,7 +763,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProce
 
         MvccCoordinator crd = curCrd;
 
-        if (crd.coordinatorVersion() != updateVer.coordinatorVersion())
+        if (crd.version() != updateVer.coordinatorVersion())
             return;
 
         MvccAckRequestTx msg = new MvccAckRequestTx((long)-1, 
updateVer.counter());
@@ -794,17 +786,18 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
     @Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) {
         MvccCoordinator crd = currentCoordinator();
 
-        if (crd == null || snapshot != null
-            && crd.coordinatorVersion() == snapshot.coordinatorVersion()
-            && sendQueryDone(crd, new 
MvccAckRequestQueryCntr(queryTrackCounter(snapshot))))
+        if (crd.nodeId() == null || snapshot == null)
             return;
 
-        Message msg = new MvccAckRequestQueryId(qryId);
+        if (crd.version() != snapshot.coordinatorVersion()
+            || !sendQueryDone(crd, new 
MvccAckRequestQueryCntr(queryTrackCounter(snapshot)))) {
+            Message msg = new MvccAckRequestQueryId(qryId);
 
-        do {
-            crd = currentCoordinator();
+            do {
+                crd = currentCoordinator();
+            }
+            while (!sendQueryDone(crd, msg));
         }
-        while (!sendQueryDone(crd, msg));
     }
 
     /** {@inheritDoc} */
@@ -871,13 +864,9 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
     /**
      * Picks mvcc coordinator from the given list of nodes.
      *
-     * @param nodes List of nodes.
-     * @param topVer Topology version.
      * @return Chosen mvcc coordinator.
      */
-    private MvccCoordinator pickMvccCoordinator(Collection<ClusterNode> nodes, 
long topVer) {
-        checkMvccSupported(nodes);
-
+    private @NotNull MvccCoordinator 
pickMvccCoordinator(Collection<ClusterNode> nodes, AffinityTopologyVersion 
topVer) {
         ClusterNode crdNode = null;
 
         if (crdC != null) {
@@ -897,15 +886,16 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
             }
         }
 
-        MvccCoordinator crd = crdNode != null ? new 
MvccCoordinator(crdNode.id(), coordinatorVersion(crdNode),
-            new AffinityTopologyVersion(topVer, 0)) : null;
+        if (crdNode != null)
+            return new MvccCoordinator(topVer, crdNode.id(), 
coordinatorVersion(crdNode), crdNode.isLocal());
 
-        if (log.isInfoEnabled() && crd != null)
-            log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + 
crdNode + ']');
-        else if (crd == null)
-            U.warn(log, "New mvcc coordinator was not assigned [topVer=" + 
topVer + ']');
+//        TODO
+//        if (log.isInfoEnabled() && crd != null)
+//            log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" 
+ crdNode + ']');
+//        else if (crd == null)
+//            U.warn(log, "New mvcc coordinator was not assigned [topVer=" + 
topVer + ']');
 
-        return crd;
+        return new MvccCoordinator(topVer);
     }
 
 
@@ -964,9 +954,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProce
 
     /** */
     private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, 
boolean client) {
-        assert initFut.isDone();
-        assert crdVer != 0;
-        assert ctx.localNodeId().equals(currentCoordinatorId());
+        assert initFut.isDone() && curCrd != null && curCrd.local();
 
         MvccSnapshotResponse res = new MvccSnapshotResponse();
 
@@ -998,7 +986,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter 
implements MvccProce
 
         cleanup = prevCrdQueries.previousQueriesDone() ? cleanup - 1 : 
MVCC_COUNTER_NA;
 
-        res.init(futId, crdVer, ver, MVCC_START_OP_CNTR, cleanup, tracking);
+        res.init(futId, curCrd.version(), ver, MVCC_START_OP_CNTR, cleanup, 
tracking);
 
         return res;
     }
@@ -1121,11 +1109,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
     IgniteInternalFuture<VacuumMetrics> runVacuum() {
         assert !ctx.clientNode();
 
-        MvccCoordinator crd0 = currentCoordinator();
-
-        if (Thread.currentThread().isInterrupted() ||
-            crd0 == null ||
-            crdVer == 0 && ctx.localNodeId().equals(crd0.nodeId()))
+        if (Thread.currentThread().isInterrupted())
             return new GridFinishedFuture<>(new VacuumMetrics());
 
         final GridFutureAdapter<VacuumMetrics> res = new GridFutureAdapter<>();
@@ -1236,8 +1220,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
                                         if (U.assertionsEnabled()) {
                                             MvccCoordinator crd = 
currentCoordinator();
 
-                                            assert crd != null
-                                                && crd.coordinatorVersion() >= 
snapshot.coordinatorVersion();
+                                            assert crd.version() >= 
snapshot.coordinatorVersion();
 
                                             for (TxKey key : waitMap.keySet()) 
{
                                                 if (!( key.major() == 
snapshot.coordinatorVersion()
@@ -1318,10 +1301,11 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
     /**
      * @param crd Mvcc coordinator.
      * @param msg Message.
-     * @return {@code True} if no need to resend the message to a new 
coordinator.
+     * @return {@code True} if the message was sent successfully.
      */
+    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
     private boolean sendQueryDone(MvccCoordinator crd, Message msg) {
-        if (crd == null)
+        if (crd.nodeId() == null)
             return true; // no need to send ack;
 
         try {
@@ -1336,7 +1320,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
             MvccCoordinator crd0 = currentCoordinator();
 
             // Coordinator is unassigned or still the same.
-            return crd0 == null || crd.coordinatorVersion() == 
crd0.coordinatorVersion();
+            return crd0.nodeId() == null || crd.version() == crd0.version();
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + 
msg + ']', e);
@@ -1502,7 +1486,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
      * @param nodeId Node ID.
      * @param msg Message.
      */
-    private void processCoordinatorActiveQueriesMessage(UUID nodeId, 
MvccActiveQueriesMessage msg) {
+    private void processActiveQueriesMessage(UUID nodeId, 
MvccActiveQueriesMessage msg) {
         prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
     }
 
@@ -1558,7 +1542,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
             if (minQry == null)
                 minQry = tracking;
 
-            res.init(futId, crdVer, ver, MVCC_READ_OP_CNTR, MVCC_COUNTER_NA, 
tracking);
+            res.init(futId, curCrd.version(), ver, MVCC_READ_OP_CNTR, 
MVCC_COUNTER_NA, tracking);
 
             return res;
         }
@@ -1663,7 +1647,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
             if (msg0.waitForCoordinatorInit() && !initFut.isDone()) {
                 initFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Void>>() {
                     @Override public void apply(IgniteInternalFuture<Void> 
future) {
-                        assert crdVer != 0L;
+                        assert curCrd.local();
 
                         processMessage(nodeId, msg);
                     }
@@ -1695,7 +1679,7 @@ public class MvccProcessorImpl extends 
GridProcessorAdapter implements MvccProce
             else if (msg instanceof MvccAckRequestQueryId)
                 processNewCoordinatorQueryAckRequest(nodeId, 
(MvccAckRequestQueryId)msg);
             else if (msg instanceof MvccActiveQueriesMessage)
-                processCoordinatorActiveQueriesMessage(nodeId, 
(MvccActiveQueriesMessage)msg);
+                processActiveQueriesMessage(nodeId, 
(MvccActiveQueriesMessage)msg);
             else if (msg instanceof MvccRecoveryFinishedMessage)
                 processRecoveryFinishedMessage(nodeId, 
((MvccRecoveryFinishedMessage)msg));
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
index d93a2e9..4db1c2b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
@@ -147,8 +147,8 @@ public class MvccQueryTrackerImpl implements 
MvccQueryTracker {
         if (snapshot != null) {
             assert crdVer != 0 : this;
 
-            if (crdVer != newCrd.coordinatorVersion()) {
-                crdVer = newCrd.coordinatorVersion();
+            if (crdVer != newCrd.version()) {
+                crdVer = newCrd.version();
 
                 return id;
             }
@@ -223,7 +223,7 @@ public class MvccQueryTrackerImpl implements 
MvccQueryTracker {
             if (done)
                 return false;
 
-            crdVer = crd.coordinatorVersion();
+            crdVer = crd.version();
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 43f87e3..b717895 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -188,7 +188,7 @@ public class MvccUtils {
 
         if ((state == TxState.NA || state == TxState.PREPARED)
             && (proc.currentCoordinator() == null // Recovery from WAL.
-            || mvccCrd < proc.currentCoordinator().coordinatorVersion()))
+            || mvccCrd < proc.currentCoordinator().version()))
             state = TxState.ABORTED;
 
         return state;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
index 1c022b0..38ca146 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
@@ -26,6 +26,7 @@ import java.io.ObjectOutput;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.NoSuchElementException;
+import java.util.stream.Collector;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;

Reply via email to