Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 688e9b041 -> 5a663e67f


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a663e67
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a663e67
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a663e67

Branch: refs/heads/ignite-5578
Commit: 5a663e67f078f615abadb4187fe6c1a9b5b5e96a
Parents: 688e9b0
Author: sboikov <[email protected]>
Authored: Fri Jul 21 11:40:16 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Jul 21 18:27:09 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  28 +-
 .../GridCachePartitionExchangeManager.java      |  15 +-
 .../GridDhtPartitionsAbstractMessage.java       |  15 +-
 .../GridDhtPartitionsExchangeFuture.java        | 421 +++++++++++++------
 .../GridDhtPartitionsSingleMessage.java         |  39 +-
 .../dht/preloader/InitNewCoordinatorFuture.java | 179 ++++++++
 .../distributed/CacheExchangeMergeTest.java     | 271 ++++++++++--
 7 files changed, 784 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 7f55e79..d5b4be7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1564,19 +1564,31 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * Called on exchange initiated by server node leave.
      *
      * @param fut Exchange future.
+     * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if affinity should be assigned by coordinator.
      */
-    public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) 
throws IgniteCheckedException {
+    public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, 
boolean crd) throws IgniteCheckedException {
         ClusterNode leftNode = fut.discoveryEvent().eventNode();
 
         assert !leftNode.isClient() : leftNode;
 
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (grp.isLocal())
-                continue;
+        if (crd) {
+            // Need initialize CacheGroupHolders if this node become 
coordinator on this exchange.
+            forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
+                @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
+                    CacheGroupHolder cache = 
groupHolder(fut.topologyVersion(), desc);
 
-            grp.affinity().calculate(fut.topologyVersion(), 
fut.discoveryEvent(), fut.discoCache());
+                    cache.aff.calculate(fut.topologyVersion(), 
fut.discoveryEvent(), fut.discoCache());
+                }
+            });
+        }
+        else {
+            forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
+                @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
+                    aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), 
fut.discoCache());
+                }
+            });
         }
 
         synchronized (mux) {
@@ -1600,12 +1612,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 CacheGroupHolder grpHolder = grpHolders.get(desc.groupId());
 
-                if (grpHolder != null) {
-                    if (grpHolder.client()) // Affinity for non-client holders 
calculated in {@link #onServerLeft}.
-                        grpHolder.affinity().calculate(fut.topologyVersion(), 
fut.discoveryEvent(), fut.discoCache());
-
+                if (grpHolder != null)
                     return;
-                }
 
                 // Need initialize holders and affinity if this node became 
coordinator during this exchange.
                 final Integer grpId = desc.groupId();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 93b1729..d26ca0a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -598,7 +598,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @param ver Node version.
      * @return Supported exchange protocol version.
      */
-    static int exchangeProtocolVersion(IgniteProductVersion ver) {
+    public static int exchangeProtocolVersion(IgniteProductVersion ver) {
         if (ver.compareToIgnoreTimestamp(EXCHANGE_PROTOCOL_2_SINCE) >= 0)
             return 2;
 
@@ -1884,6 +1884,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         /** Busy flag used as performance optimization to stop current 
preloading. */
         private volatile boolean busy;
 
+        /** */
+        private boolean crd;
+
         /**
          * Constructor.
          */
@@ -2112,7 +2115,15 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                             lastInitializedFut = exchFut;
 
-                            exchFut.init();
+                            boolean newCrd = false;
+
+                            if (!crd) {
+                                List<ClusterNode> srvNodes = 
exchFut.discoCache().serverNodes();
+
+                                crd = newCrd = !srvNodes.isEmpty() && 
srvNodes.get(0).isLocal();
+                            }
+
+                            exchFut.init(newCrd);
 
                             int dumpCnt = 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 466ec03..9adbf0b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -34,7 +34,10 @@ import org.jetbrains.annotations.Nullable;
  */
 public abstract class GridDhtPartitionsAbstractMessage extends 
GridCacheMessage {
     /** */
-    protected static final byte COMPRESSED_FLAG_MASK = 1;
+    private static final byte COMPRESSED_FLAG_MASK = 0x01;
+
+    /** */
+    private static final byte RESTORE_STATE_FLAG_MASK = 0x02;
 
     /** */
     private static final long serialVersionUID = 0L;
@@ -46,7 +49,7 @@ public abstract class GridDhtPartitionsAbstractMessage 
extends GridCacheMessage
     private GridCacheVersion lastVer;
 
     /** */
-    private byte flags;
+    protected byte flags;
 
     /**
      * Required by {@link Externalizable}.
@@ -131,6 +134,14 @@ public abstract class GridDhtPartitionsAbstractMessage 
extends GridCacheMessage
         flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : 
(byte)(flags & ~COMPRESSED_FLAG_MASK);
     }
 
+    public void restoreState(boolean restoreState) {
+        flags = restoreState ? (byte)(flags | RESTORE_STATE_FLAG_MASK) : 
(byte)(flags & ~RESTORE_STATE_FLAG_MASK);
+    }
+
+    boolean restoreState() {
+        return (flags & RESTORE_STATE_FLAG_MASK) != 0;
+    }
+
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
         return 5;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 68e9951..3674276 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -78,8 +78,8 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -88,7 +88,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -175,7 +174,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * Messages received on non-coordinator are stored in case if this node
      * becomes coordinator.
      */
-    private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new 
ConcurrentHashMap8<>();
+    private final Map<UUID, GridDhtPartitionsSingleMessage> pendingSingleMsgs 
= new ConcurrentHashMap8<>();
 
     /** Messages received from new coordinator. */
     private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs = 
new ConcurrentHashMap8<>();
@@ -243,6 +242,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private FinishState finishState;
 
+    /** */
+    @GridToStringExclude
+    private InitNewCoordinatorFuture newCrdFut;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -277,6 +280,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             log.debug("Creating exchange future [localNode=" + 
cctx.localNodeId() + ", fut=" + this + ']');
     }
 
+    GridCacheSharedContext sharedContext() {
+        return cctx;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean skipForExchangeMerge() {
         return false;
@@ -456,7 +463,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      *
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    public void init() throws IgniteInterruptedCheckedException {
+    public void init(boolean newCrd) throws IgniteInterruptedCheckedException {
         if (isDone())
             return;
 
@@ -484,24 +491,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             assert state == null : state;
 
-            if (crdNode) {
-                if (!remaining.isEmpty()) {
-                    IgniteInternalFuture<?>  fut = 
cctx.affinity().initCoordinatorCaches(this, false);
-
-                    if (fut != null)
-                        fut.get();
-                }
-
+            if (crdNode)
                 state = ExchangeLocalState.CRD;
-            }
             else
                 state = cctx.kernalContext().clientNode() ? 
ExchangeLocalState.CLIENT : ExchangeLocalState.SRV;
 
             exchLog.info("Started exchange init [topVer=" + topVer +
                 ", crd=" + crdNode +
-                ", evt=" + discoEvt.type() +
-                ", node=" + discoEvt.node() +
-                ", evtNode=" + discoEvt.node() +
+                ", evt=" + IgniteUtils.gridEventName(discoEvt.type()) +
+                ", evtNode=" + discoEvt.eventNode().id() +
                 ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT 
? ((DiscoveryCustomEvent)discoEvt).customMessage() : null) +
                 ']');
 
@@ -571,6 +569,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
+            if (newCrd) {
+                IgniteInternalFuture<?> fut = 
cctx.affinity().initCoordinatorCaches(this, false);
+
+                if (fut != null)
+                    fut.get();
+            }
+
             updateTopologies(crdNode);
 
             if (exchange != null) {
@@ -867,7 +872,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             warnNoAffinityNodes();
 
-            centralizedAff = cctx.affinity().onServerLeft(this);
+            centralizedAff = cctx.affinity().onServerLeft(this, crd);
         }
         else
             cctx.affinity().onServerJoin(this, crd);
@@ -1199,7 +1204,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * @return {@code True} if exchange for local node join.
      */
-    private boolean localJoinExchange() {
+    boolean localJoinExchange() {
         return discoEvt.type() == EVT_NODE_JOINED && 
discoEvt.eventNode().isLocal();
     }
 
@@ -1277,13 +1282,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * @param nodes Nodes.
      * @param joinedNodeAff Affinity if was requested by some nodes.
-     * @throws IgniteCheckedException If failed.
      */
     private void sendAllPartitions(
         GridDhtPartitionsFullMessage msg,
         Collection<ClusterNode> nodes,
-        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff)
-        throws IgniteCheckedException {
+        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
         boolean singleNode = nodes.size() == 1;
 
         GridDhtPartitionsFullMessage joinedNodeMsg = null;
@@ -1334,13 +1337,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 cctx.io().send(node, sndMsg, SYSTEM_POOL);
             }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send partitions, node failed: " + 
node);
+            }
             catch (IgniteCheckedException e) {
-                if (cctx.io().checkNodeLeft(node.id(), e, false)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send partitions, node failed: " + 
node);
-                }
-                else
-                    U.error(log, "Failed to send partitions [node=" + node + 
']', e);
+                U.error(log, "Failed to send partitions [node=" + node + ']', 
e);
             }
         }
     }
@@ -1498,7 +1500,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * Cleans up resources to avoid excessive memory usage.
      */
     public void cleanUp() {
-        singleMsgs.clear();
+        pendingSingleMsgs.clear();
         fullMsgs.clear();
         msgs.clear();
         changeGlobalStateExceptions.clear();
@@ -1650,6 +1652,21 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         assert exchId.equals(msg.exchangeId()) : msg;
         assert !cctx.kernalContext().clientNode();
 
+        if (msg.restoreState()) {
+            InitNewCoordinatorFuture newCrdFut0;
+
+            synchronized (this) {
+                assert newCrdFut != null;
+
+                newCrdFut0 = newCrdFut;
+            }
+
+            newCrdFut0.onMessage(node, msg);
+
+            return;
+        }
+
+
         if (!msg.client()) {
             assert msg.lastVersion() != null : msg;
 
@@ -1697,7 +1714,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
     /**
      * @param nodeId Node ID.
-     * @param msg
+     * @param msg Client's message.
      */
     private void waitAndReplyToClient(final UUID nodeId, final 
GridDhtPartitionsSingleMessage msg) {
         assert msg.client();
@@ -1780,7 +1797,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     log.info("Non-coordinator received single message [ver=" + 
initialVersion() +
                         ", node=" + nodeId + ", state=" + state + ']');
 
-                    singleMsgs.put(nodeId, msg);
+                    pendingSingleMsgs.put(nodeId, msg);
 
                     break;
                 }
@@ -2292,7 +2309,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 onDone(exchCtx.events().topologyVersion(), err);
 
-                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
singleMsgs.entrySet())
+                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
pendingSingleMsgs.entrySet())
                     processSingleMessage(e.getKey(), e.getValue());
             }
         }
@@ -2379,7 +2396,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     return;
                 }
 
-                processFullMessage(node, msg);
+                processFullMessage(true, node, msg);
             }
         });
     }
@@ -2389,11 +2406,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param msg Message.
      */
     public void onReceivePartitionRequest(final ClusterNode node, final 
GridDhtPartitionsSingleRequest msg) {
-        assert !cctx.kernalContext().clientNode();
-        assert !node.isDaemon() : node;
-
-        if (!cctx.discovery().alive(node.id()))
-            return;
+        assert !cctx.kernalContext().clientNode() || msg.restoreState();
+        assert !node.isDaemon() && !CU.clientNode(node) : node;
 
         initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
             @Override public void apply(IgniteInternalFuture<Boolean> fut) {
@@ -2407,7 +2421,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param msg Message.
      */
     private void processSinglePartitionRequest(ClusterNode node, 
GridDhtPartitionsSingleRequest msg) {
+        FinishState finishState0 = null;
+
         synchronized (this) {
+            if (crd == null) {
+                log.info("Ignore partitions request, no coordinator [node=" + 
node.id() + ']');
+
+                return;
+            }
+
             switch (state) {
                 case DONE: {
                     assert finishState != null;
@@ -2418,6 +2440,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         return;
                     }
 
+                    finishState0 = finishState;
+
                     break;
                 }
 
@@ -2428,15 +2452,64 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     return;
                 }
 
+                case CLIENT:
                 case SRV: {
+                    if (!cctx.discovery().alive(node)) {
+                        log.info("Ignore restore state request, node is not 
alive [node=" + node.id() + ']');
+
+                        return;
+                    }
+
+                    if (msg.restoreState()) {
+                        if (!node.equals(crd)) {
+                            if (node.order() > crd.order()) {
+                                log.info("Received restore state request, 
change coordinator [oldCrd=" + crd.id() +
+                                    "newCrd=" + node.id() + ']');
+
+                                crd = node; // Do not allow to process 
FullMessage from old coordinator.
+                            }
+                            else {
+                                log.info("Ignore restore state request, 
coordinator changed [oldCrd=" + crd.id() +
+                                    "newCrd=" + node.id() + ']');
+
+                                return;
+                            }
+                        }
+                    }
 
                     break;
                 }
 
+                default:
+                    assert false : state;
             }
         }
 
-        // TODO 5578, backward compatibility, send state if available.
+        if (msg.restoreState()) {
+            try {
+                GridDhtPartitionsSingleMessage res = 
cctx.exchange().createPartitionsSingleMessage(msg.exchangeId(),
+                    cctx.kernalContext().clientNode(),
+                    true);
+
+                if (localJoinExchange())
+                    
res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
+
+                res.restoreState(true);
+
+                res.finishMessage(finishState0 != null ? finishState0.msg : 
null);
+
+                cctx.io().send(node, res, SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Node left during partition exchange [nodeId=" + 
node.id() + ", exchId=" + exchId + ']');
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send partitions message [node=" + node 
+ ", msg=" + msg + ']', e);
+            }
+
+            return;
+        }
 
         try {
             sendLocalPartitions(node);
@@ -2450,60 +2523,66 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param node Sender node.
      * @param msg Message.
      */
-    private void processFullMessage(ClusterNode node, 
GridDhtPartitionsFullMessage msg) {
+    private void processFullMessage(boolean checkCrd, ClusterNode node, 
GridDhtPartitionsFullMessage msg) {
         try {
             assert exchId.equals(msg.exchangeId()) : msg;
             assert msg.lastVersion() != null : msg;
 
-            synchronized (this) {
-                if (crd == null) {
-                    log.info("Ignore full message, all server nodes left: " + 
msg);
+            if (checkCrd) {
+                assert node != null;
 
-                    return;
-                }
-
-                switch (state) {
-                    case CRD:
-                    case BECOME_CRD: {
-                        log.info("Ignore full message, node is coordinator: " 
+ msg);
+                synchronized (this) {
+                    if (crd == null) {
+                        log.info("Ignore full message, all server nodes left: 
" + msg);
 
                         return;
                     }
 
-                    case DONE: {
-                        log.info("Ignore full message, future is done: " + 
msg);
+                    switch (state) {
+                        case CRD:
+                        case BECOME_CRD: {
+                            log.info("Ignore full message, node is 
coordinator: " + msg);
 
-                        return;
-                    }
+                            return;
+                        }
 
-                    case SRV:
-                    case CLIENT: {
-                        if (!crd.equals(node)) {
-                            log.info("Received full message from 
non-coordinator [node=" + node.id() +
+                        case DONE: {
+                            log.info("Ignore full message, future is done: " + 
msg);
+
+                            return;
+                        }
+
+                        case SRV:
+                        case CLIENT: {
+                            if (!crd.equals(node)) {
+                                log.info("Received full message from 
non-coordinator [node=" + node.id() +
                                     ", nodeOrder=" + node.order() +
                                     ", crd=" + crd.id() +
                                     ", crdOrder=" + crd.order() + ']');
 
-                            if (node.order() > crd.order())
-                                fullMsgs.put(node, msg);
+                                if (node.order() > crd.order())
+                                    fullMsgs.put(node, msg);
 
-                            return;
-                        }
-                        else {
-                            log.info("Received full message, will finish 
exchange [node=" + node.id() +
-                                ", resVer=" + msg.resultTopologyVersion() + 
']');
+                                return;
+                            }
+                            else {
+                                log.info("Received full message, will finish 
exchange [node=" + node.id() +
+                                    ", resVer=" + msg.resultTopologyVersion() 
+ ']');
 
-                            finishState = new FinishState(crd.id(),
-                                msg.resultTopologyVersion() != null ? 
msg.resultTopologyVersion() : initialVersion(),
-                                msg);
+                                finishState = new FinishState(crd.id(),
+                                    msg.resultTopologyVersion() != null ? 
msg.resultTopologyVersion() : initialVersion(),
+                                    msg);
 
-                            state = ExchangeLocalState.DONE;
+                                state = ExchangeLocalState.DONE;
 
-                            break;
+                                break;
+                            }
                         }
                     }
                 }
             }
+            else
+                assert node == null : node;
 
             if (exchCtx.mergeExchanges()) {
                 if (msg.resultTopologyVersion() != null && 
!initialVersion().equals(msg.resultTopologyVersion())) {
@@ -2722,6 +2801,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                         discoCache.updateAlives(node);
 
+                        InitNewCoordinatorFuture newCrdFut0;
+
+                        synchronized (this) {
+                            newCrdFut0 = newCrdFut;
+                        }
+
+                        if (newCrdFut0 != null)
+                            newCrdFut0.onNodeLeft(node.id());
+
                         synchronized (this) {
                             if (!srvNodes.remove(node))
                                 return;
@@ -2756,9 +2844,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                                 case SRV:
                                     assert crd != null;
 
-                                    if (crdChanged && crd.isLocal())
+                                    if (crdChanged && crd.isLocal()) {
                                         state = ExchangeLocalState.BECOME_CRD;
 
+                                        newCrdFut = new 
InitNewCoordinatorFuture();
+                                    }
+
                                     break;
                             }
 
@@ -2770,7 +2861,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         }
 
                         if (crd0 == null) {
-                            assert cctx.kernalContext().clientNode() || 
cctx.localNode().isDaemon() : cctx.localNode();
+                            assert cctx.kernalContext().clientNode() : 
cctx.localNode();
 
                             List<ClusterNode> empty = Collections.emptyList();
 
@@ -2796,20 +2887,16 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                                 log.info("Coordinator failed, node is new 
coordinator [ver=" + initialVersion() +
                                     ", prev=" + node.id() + ']');
 
-                                boolean newAff = localJoinExchange();
+                                assert newCrdFut != null;
 
-                                IgniteInternalFuture<?> fut = 
cctx.affinity().initCoordinatorCaches(
-                                    GridDhtPartitionsExchangeFuture.this, 
newAff);
 
-                                if (fut == null || fut.isDone())
-                                    onBecomeCoordinator();
-                                else {
-                                    fut.listen(new 
CI1<IgniteInternalFuture<?>>() {
-                                        @Override public void 
apply(IgniteInternalFuture<?> fut) {
-                                            onBecomeCoordinator();
-                                        }
-                                    });
-                                }
+                                
newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
+
+                                newCrdFut.listen(new 
CI1<IgniteInternalFuture>() {
+                                    @Override public void 
apply(IgniteInternalFuture fut) {
+                                        
onBecomeCoordinator((InitNewCoordinatorFuture)fut);
+                                    }
+                                });
 
                                 return;
                             }
@@ -2823,22 +2910,25 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         else {
                             if (crdChanged) {
                                 for (Map.Entry<ClusterNode, 
GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) {
-                                    log.info("Coordinator changed, process 
pending full message [" +
-                                        "ver=" + initialVersion() +
-                                        ", crd=" + node.id() +
-                                        ", pendingMsgNode=" + m.getKey() + 
']');
+                                    if (crd0.equals(m.getKey())) {
+                                        log.info("Coordinator changed, process 
pending full message [" +
+                                            "ver=" + initialVersion() +
+                                            ", crd=" + node.id() +
+                                            ", pendingMsgNode=" + m.getKey() + 
']');
+
+                                        processFullMessage(true, m.getKey(), 
m.getValue());
 
-                                    processFullMessage(m.getKey(), 
m.getValue());
+                                        if (isDone())
+                                            return;
+                                    }
                                 }
 
-                                if (!isDone()) {
-                                    log.info("Coordinator changed, send 
partitions to new coordinator [" +
-                                        "ver=" + initialVersion() +
-                                        ", crd=" + node.id() +
-                                        ", newCrd=" + crd0.id() + ']');
+                                log.info("Coordinator changed, send partitions 
to new coordinator [" +
+                                    "ver=" + initialVersion() +
+                                    ", crd=" + node.id() +
+                                    ", newCrd=" + crd0.id() + ']');
 
-                                    sendPartitions(crd0);
-                                }
+                                sendPartitions(crd0);
                             }
                         }
                     }
@@ -2860,56 +2950,131 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * @param newCrdFut Coordinator initialization future.
      */
-    private void onBecomeCoordinator() {
-        Set<UUID> remaining0 = null;
+    private void onBecomeCoordinator(InitNewCoordinatorFuture newCrdFut) {
+        boolean allRcvd = false;
 
-        synchronized (this) {
-            assert crd != null && crd.isLocal();
-
-            state = ExchangeLocalState.CRD;
+        if (newCrdFut.restoreState()) {
+            GridDhtPartitionsFullMessage fullMsg = newCrdFut.fullMessage();
 
-            assert mergedJoinExchMsgs == null;
+            boolean process = fullMsg == null;
 
-            log.info("New coordinator initialization finished [ver=" + 
initialVersion() +
-                ", remaining=" + remaining + ']');
+            Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs0 = 
newCrdFut.messages();
 
-            if (!remaining.isEmpty())
-                remaining0 = new HashSet<>(remaining);
-        }
+            assert msgs.isEmpty() : msgs;
 
-        if (remaining0 != null) {
-            // It is possible that some nodes finished exchange with previous 
coordinator.
-            GridDhtPartitionsSingleRequest req = new 
GridDhtPartitionsSingleRequest(exchId);
+            for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : 
msgs0.entrySet()) {
+                GridDhtPartitionsSingleMessage msg = e.getValue();
 
-            for (UUID nodeId : remaining0) {
-                try {
-                    if (!singleMsgs.containsKey(nodeId)) {
-                        log.info("New coordinator sends request [ver=" + 
initialVersion() +
-                            ", node=" + nodeId + ']');
+                if (!msg.client()) {
+                    msgs.put(e.getKey().id(), e.getValue());
 
-                        cctx.io().send(nodeId, req, SYSTEM_POOL);
-                    }
+                    if (process)
+                        updatePartitionSingleMap(e.getKey().id(), msg);
                 }
-                catch (ClusterTopologyCheckedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Node left during partition exchange 
[nodeId=" + nodeId +
-                            ", exchId=" + exchId + ']');
+            }
+
+            if (fullMsg != null) {
+                log.info("New coordinator restored state [ver=" + 
initialVersion() +
+                    ", resVer=" + fullMsg.resultTopologyVersion() + ']');
+
+                synchronized (this) {
+                    state = ExchangeLocalState.DONE;
+
+                    finishState = new FinishState(crd.id(), 
fullMsg.resultTopologyVersion(), fullMsg);
                 }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to request partitions from node: " + 
nodeId, e);
+
+                processFullMessage(false, null, fullMsg);
+
+                Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = 
newCrdFut.messages();
+
+                if (!F.isEmpty(msgs)) {
+                    Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = 
null;
+
+                    for (Map.Entry<ClusterNode, 
GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
+                        GridDhtPartitionsSingleMessage msg = e.getValue();
+
+                        Collection<Integer> affReq = 
msg.cacheGroupsAffinityRequest();
+
+                        if (!F.isEmpty(affReq)) {
+                            joinedNodeAff = 
CacheGroupAffinityMessage.createAffinityMessages(cctx,
+                                fullMsg.resultTopologyVersion(),
+                                affReq,
+                                joinedNodeAff);
+                        }
+                    }
+
+                    sendAllPartitions(fullMsg, msgs.keySet(), joinedNodeAff);
                 }
+
+                return;
             }
+            else
+                log.info("New coordinator restore state finished [ver=" + 
initialVersion() + ']');
 
-            for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : 
singleMsgs.entrySet()) {
-                log.info("New coordinator process pending message [ver=" + 
initialVersion() +
-                        ", node=" + m.getKey() + ']');
+            allRcvd = true;
 
-                processSingleMessage(m.getKey(), m.getValue());
+            synchronized (this) {
+                remaining.clear(); // Do not process messages.
+
+                assert crd != null && crd.isLocal();
+
+                state = ExchangeLocalState.CRD;
+
+                assert mergedJoinExchMsgs == null;
             }
         }
         else {
+            Set<UUID> remaining0 = null;
+
+            synchronized (this) {
+                assert crd != null && crd.isLocal();
+
+                state = ExchangeLocalState.CRD;
+
+                assert mergedJoinExchMsgs == null;
+
+                log.info("New coordinator initialization finished [ver=" + 
initialVersion() +
+                    ", remaining=" + remaining + ']');
+
+                if (!remaining.isEmpty())
+                    remaining0 = new HashSet<>(remaining);
+            }
+
+            if (remaining0 != null) {
+                // It is possible that some nodes finished exchange with 
previous coordinator.
+                GridDhtPartitionsSingleRequest req = new 
GridDhtPartitionsSingleRequest(exchId);
+
+                for (UUID nodeId : remaining0) {
+                    try {
+                        if (!pendingSingleMsgs.containsKey(nodeId)) {
+                            log.info("New coordinator sends request [ver=" + 
initialVersion() +
+                                ", node=" + nodeId + ']');
+
+                            cctx.io().send(nodeId, req, SYSTEM_POOL);
+                        }
+                    }
+                    catch (ClusterTopologyCheckedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Node left during partition exchange 
[nodeId=" + nodeId +
+                                ", exchId=" + exchId + ']');
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to request partitions from node: 
" + nodeId, e);
+                    }
+                }
+
+                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : 
pendingSingleMsgs.entrySet()) {
+                    log.info("New coordinator process pending message [ver=" + 
initialVersion() +
+                        ", node=" + m.getKey() + ']');
+
+                    processSingleMessage(m.getKey(), m.getValue());
+                }
+            }
+        }
+
+        if (allRcvd) {
             awaitSingleMapUpdates();
 
             onAllReceived();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 4c98742..4df6d67 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -94,6 +94,9 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     @GridDirectCollection(Integer.class)
     private Collection<Integer> grpsAffRequest;
 
+    /** */
+    private GridDhtPartitionsFullMessage finishMsg;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -117,6 +120,14 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         this.compress = compress;
     }
 
+    public void finishMessage(GridDhtPartitionsFullMessage finishMsg) {
+        this.finishMsg = finishMsg;
+    }
+
+    public GridDhtPartitionsFullMessage finishMessage() {
+        return finishMsg;
+    }
+
     /**
      * @param grpsAffRequest
      */
@@ -394,24 +405,30 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, 
MessageCollectionItemType.INT))
+                if (!writer.writeMessage("finishMsg", finishMsg))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, 
MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("partHistCntrsBytes", 
partHistCntrsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 11:
+                if (!writer.writeByteArray("partHistCntrsBytes", 
partHistCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -458,7 +475,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 8:
-                grpsAffRequest = reader.readCollection("grpsAffRequest", 
MessageCollectionItemType.INT);
+                finishMsg = reader.readMessage("finishMsg");
 
                 if (!reader.isLastRead())
                     return false;
@@ -466,7 +483,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 9:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                grpsAffRequest = reader.readCollection("grpsAffRequest", 
MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -474,7 +491,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 10:
-                partHistCntrsBytes = 
reader.readByteArray("partHistCntrsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -482,6 +499,14 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 11:
+                partHistCntrsBytes = 
reader.readByteArray("partHistCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -501,7 +526,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 13;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
new file mode 100644
index 0000000..4f9eee3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ *
+ */
+public class InitNewCoordinatorFuture extends GridCompoundFuture {
+    /** */
+    private GridDhtPartitionsFullMessage fullMsg;
+
+    /** */
+    private Set<UUID> awaited = new HashSet<>();
+
+    /** */
+    private Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = new 
HashMap<>();
+
+    /** */
+    private GridFutureAdapter restoreStateFut;
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    // TODO IGNITE-5578 backward compatibility
+    private boolean restoreState = true;
+
+    public boolean restoreState() {
+        return restoreState;
+    }
+
+    /**
+     * @param exchFut Current future.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void init(GridDhtPartitionsExchangeFuture exchFut) throws 
IgniteCheckedException {
+        GridCacheSharedContext cctx = exchFut.sharedContext();
+
+        log = cctx.logger(getClass());
+
+        boolean newAff = exchFut.localJoinExchange();
+
+        IgniteInternalFuture<?> fut = 
cctx.affinity().initCoordinatorCaches(exchFut, newAff);
+
+        if (fut != null)
+            add(fut);
+
+        DiscoCache discoCache = exchFut.discoCache();
+
+        List<ClusterNode> nodes = new ArrayList<>();
+
+        synchronized (this) {
+            for (ClusterNode node : discoCache.allNodes()) {
+                if (!node.isLocal() && cctx.discovery().alive(node)) {
+                    awaited.add(node.id());
+
+                    nodes.add(node);
+                }
+            }
+
+            if (!awaited.isEmpty()) {
+                restoreStateFut = new GridFutureAdapter();
+
+                add(restoreStateFut);
+            }
+        }
+
+        if (!nodes.isEmpty()) {
+            // TODO IGNITE-5578: merged nodes.
+            GridDhtPartitionsSingleRequest req = new 
GridDhtPartitionsSingleRequest(exchFut.exchangeId());
+
+            req.restoreState(true);
+
+            for (ClusterNode node : nodes) {
+                try {
+                    cctx.io().send(node, req, GridIoPolicy.SYSTEM_POOL);
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send partitions request, node 
failed: " + node);
+
+                    onNodeLeft(node.id());
+                }
+            }
+        }
+
+        markInitialized();
+    }
+
+    /**
+     * @return Received messages.
+     */
+    Map<ClusterNode, GridDhtPartitionsSingleMessage> messages() {
+        return msgs;
+    }
+
+    /**
+     * @return Full message is some of nodes received it from previous 
coordinator.
+     */
+    GridDhtPartitionsFullMessage fullMessage() {
+        return fullMsg;
+    }
+
+    /**
+     * @param node Node.
+     * @param msg Message.
+     */
+    public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage 
msg) {
+        assert msg.restoreState() : msg;
+
+        boolean done = false;
+
+        synchronized (this) {
+            if (awaited.remove(node.id())) {
+                GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
+
+                if (fullMsg0 != null) {
+                    assert fullMsg == null || 
fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
+
+                    fullMsg  = fullMsg0;
+                }
+
+                msgs.put(node, msg);
+
+                done = awaited.isEmpty();
+            }
+        }
+
+        if (done)
+            restoreStateFut.onDone();
+    }
+
+    /**
+     * @param nodeId Failed node ID.
+     */
+    public void onNodeLeft(UUID nodeId) {
+        boolean done;
+
+        synchronized (this) {
+            done = awaited.remove(nodeId) && awaited.isEmpty();
+        }
+
+        if (done)
+            restoreStateFut.onDone();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 3e20be5..3d10b31 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
@@ -37,6 +39,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -119,6 +122,58 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         return ccfg;
     }
 
+    // TODO IGNITE-5578 joined merged node failed (client/server).
+    // TODO IGNITE-5578 random topology changes, random delay for exchange 
messages.
+    // TODO IGNITE-5578 check exchanges/affinity consistency.
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartServers() throws Exception {
+        concurrentStart(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartServersAndClients() throws Exception {
+        concurrentStart(true);
+    }
+
+    /**
+     * @param withClients If {@code true} also starts client nodes.
+     * @throws Exception If failed.
+     */
+    private void concurrentStart(final boolean withClients) throws Exception {
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            startGrid(0);
+
+            final AtomicInteger idx = new AtomicInteger(1);
+
+            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    if (withClients)
+                        client.set(ThreadLocalRandom.current().nextBoolean());
+
+                    Ignite node = startGrid(idx.incrementAndGet());
+
+                    checkNodeCaches(node);
+
+                    return null;
+                }
+            }, 10, "start-node");
+
+            fut.get();
+
+            checkCaches();
+
+            // TODO: stop by one, check caches - in all tests.
+            stopAllGrids();
+        }
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -242,15 +297,89 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testNoMergeJoinExchangeCoordinatorChange1() throws Exception {
-        mergeJoinExchangeCoordinatorChange(4, 
CoordinatorChangeMode.NEW_CRD_RCDV);
+    public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(4, true, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(8, true, mode);
+
+            stopAllGrids();
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void mergeJoinExchangeCoordinatorChange(int srvs, 
CoordinatorChangeMode mode) throws Exception {
-        log.info("mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", 
mode=" + mode + ']');
+    public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(5, false, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception 
{
+        for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) {
+            exchangeCoordinatorChangeNoMerge(8, false, mode);
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeExchangeCoordinatorChange() throws Exception {
+        testSpi = true;
+
+        final int srvs = 4;
+
+        Ignite srv0 = startGrids(srvs);
+
+        mergeExchangeWaitVersion(srv0, 6);
+
+        final AtomicInteger idx = new AtomicInteger(srvs);
+
+        CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(2, 3, 4, 
5), F.asList(1));
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(idx.getAndIncrement());
+
+                return null;
+            }
+        }, 2, "start-node");
+
+        if (latch != null && !latch.await(5, TimeUnit.SECONDS))
+            fail("Failed to wait for expected messages.");
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @param srvs Number of servers.
+     * @param join If {@code true} starts new node, otherwise stops node.
+     * @param mode Tested scenario.
+     * @throws Exception If failed.
+     */
+    private void exchangeCoordinatorChangeNoMerge(int srvs, final boolean 
join, CoordinatorChangeMode mode) throws Exception {
+        log.info("Test mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", 
mode=" + mode + ']');
 
         testSpi = true;
 
@@ -262,7 +391,10 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
 
         IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
             @Override public Object call() throws Exception {
-                startGrid(nodes);
+                if (join)
+                    startGrid(nodes);
+                else
+                    stopGrid(nodes - 1);
 
                 return null;
             }
@@ -280,13 +412,19 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         checkCaches();
     }
 
+    /**
+     * @param srvs Number of server nodes.
+     * @param mode Test scenario.
+     * @return Awaited state latch.
+     * @throws Exception If failed.
+     */
     private CountDownLatch blockExchangeFinish(int srvs, CoordinatorChangeMode 
mode) throws Exception {
         Ignite crd = ignite(0);
 
         long topVer = srvs + 1;
 
         switch (mode) {
-            case NON_RCVD: {
+            case NOBODY_RCVD: {
                 blockExchangeFinish(crd, topVer);
 
                 break;
@@ -314,9 +452,9 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * @param srvs
-     * @param waitNodes
-     * @return
+     * @param srvs Number of servers.
+     * @param waitNodes Nodes which should receive message.
+     * @return Blocked nodes indexes.
      */
     private List<Integer> blockNodes(int srvs, List<Integer> waitNodes) {
         List<Integer> block = new ArrayList<>();
@@ -330,20 +468,6 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     *
-     */
-    enum CoordinatorChangeMode {
-        /** */
-        NON_RCVD,
-
-        /** */
-        NEW_CRD_RCDV,
-
-        /** */
-        NON_CRD_RCVD
-    }
-
-    /**
      * @param crd Exchange coordinator.
      * @param topVer Exchange topology version.
      */
@@ -366,6 +490,9 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     /**
      * @param crd Exchange coordinator.
      * @param topVer Exchange topology version.
+     * @param blockNodes Nodes which do not receive messages.
+     * @param waitMsgNodes Nodes which should receive messages.
+     * @return Awaited state latch.
      */
     private CountDownLatch blockExchangeFinish(Ignite crd,
         long topVer,
@@ -385,7 +512,7 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
                 if (msg instanceof GridDhtPartitionsFullMessage) {
                     GridDhtPartitionsFullMessage msg0 = 
(GridDhtPartitionsFullMessage)msg;
 
-                    if (msg0.exchangeId() == null || 
!msg0.exchangeId().topologyVersion().equals(topVer0))
+                    if (msg0.exchangeId() == null || 
msg0.exchangeId().topologyVersion().compareTo(topVer0) < 0)
                         return false;
 
                     String name = 
node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME);
@@ -426,8 +553,12 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void checkCaches() throws Exception {
+        checkAffinity();
+
         checkCaches0();
 
+        checkAffinity();
+
         awaitPartitionMapExchange();
 
         checkCaches0();
@@ -441,25 +572,74 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
 
         assertTrue(nodes.size() > 0);
 
+        for (Ignite node : nodes)
+            checkNodeCaches(node);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkAffinity() throws Exception {
+        List<Ignite> nodes = G.allGrids();
+
+        ClusterNode crdNode = null;
+
+        for (Ignite node : nodes) {
+            ClusterNode locNode = node.cluster().localNode();
+
+            if (crdNode == null || locNode.order() < crdNode.order())
+                crdNode = locNode;
+        }
+
+        AffinityTopologyVersion topVer = ((IgniteKernal)grid(crdNode)).
+            context().cache().context().exchange().readyAffinityVersion();
+
+        Map<String, List<List<ClusterNode>>> affMap = new HashMap<>();
+
+        for (Ignite node : nodes) {
+            IgniteKernal node0 = (IgniteKernal)node;
+
+            for (IgniteInternalCache cache : node0.context().cache().caches()) 
{
+                List<List<ClusterNode>> aff = affMap.get(cache.name());
+                List<List<ClusterNode>> aff0 = 
cache.context().affinity().assignments(topVer);
+
+                if (aff != null)
+                    assertEquals(aff, aff0);
+                else
+                    affMap.put(cache.name(), aff0);
+            }
+        }
+    }
+
+    /**
+     * @param node Node.
+     */
+    private void checkNodeCaches(Ignite node) {
         String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"};
 
         ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-        for (Ignite node : nodes) {
-            for (String cacheName : cacheNames) {
-                IgniteCache<Object, Object> cache = node.cache(cacheName);
+        for (String cacheName : cacheNames) {
+            IgniteCache<Object, Object> cache = node.cache(cacheName);
 
-                assertNotNull("No cache [node=" + node.name() +
-                    ", order=" + node.cluster().localNode().order() +
-                    ", cache=" + cacheName + ']', cache);
+            // TODO: multithreaded, putAll and transactions.
 
-                for (int i = 0; i < 10; i++) {
-                    Integer key = rnd.nextInt(100_000);
+            assertNotNull("No cache [node=" + node.name() +
+                ", client=" + node.configuration().isClientMode() +
+                ", order=" + node.cluster().localNode().order() +
+                ", cache=" + cacheName + ']', cache);
 
-                    cache.put(key, i);
+            String err = "Invalid value [node=" + node.name() +
+                ", client=" + node.configuration().isClientMode() +
+                ", order=" + node.cluster().localNode().order() +
+                ", cache=" + cacheName + ']';
 
-                    assertEquals(i, cache.get(key));
-                }
+            for (int i = 0; i < 10; i++) {
+                Integer key = rnd.nextInt(100_000);
+
+                cache.put(key, i);
+
+                assertEquals(err, i, cache.get(key));
             }
         }
     }
@@ -480,4 +660,25 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
 
         assertTrue(wait);
     }
+
+    /**
+     *
+     */
+    enum CoordinatorChangeMode {
+        /**
+         * Coordinator failed, did not send full message.
+         */
+        NOBODY_RCVD,
+
+        /**
+         * Coordinator failed, but new coordinator received full message
+         * and finished exchange.
+         */
+        NEW_CRD_RCDV,
+
+        /**
+         * Coordinator failed, but one of servers (not new coordinator) 
received full message.
+         */
+        NON_CRD_RCVD
+    }
 }

Reply via email to