Repository: ignite
Updated Branches:
  refs/heads/master 8d3e37bf2 -> 66fcde3d0


IGNITE-9227 Fixed missing reply to a single message during coordinator 
failover. Fixes #4518.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66fcde3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66fcde3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66fcde3d

Branch: refs/heads/master
Commit: 66fcde3d0acb88b028024e4a62dc4b2f9ddfe534
Parents: 8d3e37b
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Thu Aug 16 18:39:49 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Thu Aug 16 18:41:12 2018 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  17 +-
 .../GridCachePartitionExchangeManager.java      |  34 ++-
 .../preloader/CacheGroupAffinityMessage.java    |  17 +-
 .../GridDhtPartitionsExchangeFuture.java        | 215 +++++++++++--------
 .../preloader/GridDhtPartitionsFullMessage.java |   4 +-
 .../cache/persistence/file/FilePageStore.java   |   2 +-
 ...rtitionsExchangeCoordinatorFailoverTest.java | 155 +++++++++++++
 .../distributed/CacheExchangeMergeTest.java     |   2 -
 .../testsuites/IgniteCacheTestSuite6.java       |   3 +-
 9 files changed, 349 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 3862523..41f7c63 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -70,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -1383,9 +1382,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 
-        final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = 
msg.joinedNodeAffinity();
+        final Map<Integer, CacheGroupAffinityMessage> receivedAff = 
msg.joinedNodeAffinity();
 
-        assert F.isEmpty(affReq) || (!F.isEmpty(joinedNodeAff) && 
joinedNodeAff.size() >= affReq.size()) : msg;
+        assert F.isEmpty(affReq) || (!F.isEmpty(receivedAff) && 
receivedAff.size() >= affReq.size())
+            : ("Requested and received affinity are different " +
+                "[requestedCnt=" + (affReq != null ? affReq.size() : "none") +
+                ", receivedCnt=" + (receivedAff != null ? receivedAff.size() : 
"none") +
+                ", msg=" + msg + "]");
 
         forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
@@ -1398,7 +1401,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 if (affReq != null && affReq.contains(aff.groupId())) {
                     assert 
AffinityTopologyVersion.NONE.equals(aff.lastVersion());
 
-                    CacheGroupAffinityMessage affMsg = 
joinedNodeAff.get(aff.groupId());
+                    CacheGroupAffinityMessage affMsg = 
receivedAff.get(aff.groupId());
 
                     assert affMsg != null;
 
@@ -1774,8 +1777,10 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      * @return Future completed when caches initialization is done.
      */
-    public IgniteInternalFuture<?> initCoordinatorCaches(final 
GridDhtPartitionsExchangeFuture fut,
-        final boolean newAff) throws IgniteCheckedException {
+    public IgniteInternalFuture<?> initCoordinatorCaches(
+        final GridDhtPartitionsExchangeFuture fut,
+        final boolean newAff
+    ) throws IgniteCheckedException {
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new 
ArrayList<>();
 
         final AffinityTopologyVersion topVer = fut.initialVersion();

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f65d338..d5eeaeb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -116,6 +116,7 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -330,6 +331,22 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
             new MessageHandler<GridDhtPartitionsSingleMessage>() {
                 @Override public void onMessage(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
+                    GridDhtPartitionExchangeId exchangeId = msg.exchangeId();
+
+                    if (exchangeId != null) {
+                        GridDhtPartitionsExchangeFuture fut = 
exchangeFuture(exchangeId);
+
+                        boolean fastReplied = 
fut.fastReplyOnSingleMessage(node, msg);
+
+                        if (fastReplied) {
+                            if (log.isInfoEnabled())
+                                log.info("Fast replied to single message " +
+                                    "[exchId=" + exchangeId + ", nodeId=" + 
node.id() + "]");
+
+                            return;
+                        }
+                    }
+
                     if (!crdInitFut.isDone() && !msg.restoreState()) {
                         GridDhtPartitionExchangeId exchId = msg.exchangeId();
 
@@ -1363,6 +1380,15 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * Gets exchange future by exchange id.
+     *
+     * @param exchId Exchange id.
+     */
+    private GridDhtPartitionsExchangeFuture exchangeFuture(@NotNull 
GridDhtPartitionExchangeId exchId) {
+        return exchangeFuture(exchId, null, null, null, null);
+    }
+
+    /**
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
      * @param cache Discovery data cache.
@@ -1370,11 +1396,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
-    private GridDhtPartitionsExchangeFuture 
exchangeFuture(GridDhtPartitionExchangeId exchId,
+    private GridDhtPartitionsExchangeFuture exchangeFuture(
+        @NotNull GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt,
         @Nullable DiscoCache cache,
         @Nullable ExchangeActions exchActions,
-        @Nullable CacheAffinityChangeMessage affChangeMsg) {
+        @Nullable CacheAffinityChangeMessage affChangeMsg
+    ) {
         GridDhtPartitionsExchangeFuture fut;
 
         GridDhtPartitionsExchangeFuture old = exchFuts.addx(
@@ -1571,7 +1599,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     scheduleResendPartitions();
             }
             else {
-                GridDhtPartitionsExchangeFuture exchFut = 
exchangeFuture(msg.exchangeId(), null, null, null, null);
+                GridDhtPartitionsExchangeFuture exchFut = 
exchangeFuture(msg.exchangeId());
 
                 if (log.isDebugEnabled())
                     log.debug("Notifying exchange future about single message: 
" + exchFut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 8a1ffb4..7da4051 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.F;
@@ -144,7 +145,8 @@ public class CacheGroupAffinityMessage implements Message {
         GridCacheSharedContext cctx,
         AffinityTopologyVersion topVer,
         Collection<Integer> affReq,
-        @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff) {
+        @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff
+    ) {
         assert !F.isEmpty(affReq) : affReq;
 
         if (cachesAff == null)
@@ -152,7 +154,18 @@ public class CacheGroupAffinityMessage implements Message {
 
         for (Integer grpId : affReq) {
             if (!cachesAff.containsKey(grpId)) {
-                GridAffinityAssignmentCache aff = 
cctx.affinity().affinity(grpId);
+                GridAffinityAssignmentCache aff = 
cctx.affinity().groupAffinity(grpId);
+
+                // If no coordinator group holder on the node, try fetch 
affinity from existing cache group.
+                if (aff == null) {
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+                    assert grp != null : "No cache group holder or cache group 
to create AffinityMessage"
+                        + ". Requested group id: " + grpId
+                        + ". Topology version: " + topVer;
+
+                    aff = grp.affinity();
+                }
 
                 List<List<ClusterNode>> assign = aff.readyAssignments(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 28dc8f3..fbd3264 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -24,8 +24,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -37,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
@@ -1608,74 +1611,90 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param msg Message to send.
+     * @param fullMsg Message to send.
      * @param nodes Nodes.
      * @param mergedJoinExchMsgs Messages received from merged 'join node' 
exchanges.
-     * @param joinedNodeAff Affinity if was requested by some nodes.
+     * @param affinityForJoinedNodes Affinity if was requested by some nodes.
      */
     private void sendAllPartitions(
-        GridDhtPartitionsFullMessage msg,
+        GridDhtPartitionsFullMessage fullMsg,
         Collection<ClusterNode> nodes,
         Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs,
-        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
-        boolean singleNode = nodes.size() == 1;
-
-        GridDhtPartitionsFullMessage joinedNodeMsg = null;
-
+        Map<Integer, CacheGroupAffinityMessage> affinityForJoinedNodes
+    ) {
         assert !nodes.contains(cctx.localNode());
 
         if (log.isDebugEnabled()) {
             log.debug("Sending full partition map [nodeIds=" + 
F.viewReadOnly(nodes, F.node2id()) +
-                ", exchId=" + exchId + ", msg=" + msg + ']');
+                ", exchId=" + exchId + ", msg=" + fullMsg + ']');
         }
 
-        for (ClusterNode node : nodes) {
-            GridDhtPartitionsFullMessage sndMsg = msg;
-
-            if (joinedNodeAff != null) {
-                if (singleNode)
-                    msg.joinedNodeAffinity(joinedNodeAff);
-                else {
-                    GridDhtPartitionsSingleMessage singleMsg = 
msgs.get(node.id());
+        // Find any single message with affinity request. This request exists 
only for newly joined nodes.
+        Optional<GridDhtPartitionsSingleMessage> singleMsgWithAffinityReq = 
nodes.stream()
+            .flatMap(node -> Optional.ofNullable(msgs.get(node.id()))
+                .filter(singleMsg -> singleMsg.cacheGroupsAffinityRequest() != 
null)
+                .map(Stream::of)
+                .orElse(Stream.empty()))
+            .findAny();
+
+        // Prepare full message for newly joined nodes with affinity request.
+        final GridDhtPartitionsFullMessage fullMsgWithAffinity = 
singleMsgWithAffinityReq
+            .filter(singleMessage -> affinityForJoinedNodes != null)
+            .map(singleMessage -> 
fullMsg.copy().joinedNodeAffinity(affinityForJoinedNodes))
+            .orElse(null);
+
+        // Prepare and send full messages for given nodes.
+        nodes.stream()
+            .map(node -> {
+                // No joined nodes, just send a regular full message.
+                if (fullMsgWithAffinity == null)
+                    return new T2<>(node, fullMsg);
+
+                return new T2<>(
+                    node,
+                    // If single message contains affinity request, use 
special full message for such single messages.
+                    Optional.ofNullable(msgs.get(node.id()))
+                        .filter(singleMsg -> 
singleMsg.cacheGroupsAffinityRequest() != null)
+                        .map(singleMsg -> fullMsgWithAffinity)
+                        .orElse(fullMsg)
+                );
+            })
+            .map(nodeAndMsg -> {
+                ClusterNode node = nodeAndMsg.get1();
+                GridDhtPartitionsFullMessage fullMsgToSend = nodeAndMsg.get2();
+
+                // If exchange has merged, use merged version of exchange id.
+                GridDhtPartitionExchangeId sndExchId = mergedJoinExchMsgs != 
null
+                    ? Optional.ofNullable(mergedJoinExchMsgs.get(node.id()))
+                        .map(GridDhtPartitionsAbstractMessage::exchangeId)
+                        .orElse(exchangeId())
+                    : exchangeId();
 
-                    if (singleMsg != null && 
singleMsg.cacheGroupsAffinityRequest() != null) {
-                        if (joinedNodeMsg == null) {
-                            joinedNodeMsg = msg.copy();
+                if (sndExchId != null && !sndExchId.equals(exchangeId())) {
+                    GridDhtPartitionsFullMessage fullMsgWithUpdatedExchangeId 
= fullMsgToSend.copy();
 
-                            joinedNodeMsg.joinedNodeAffinity(joinedNodeAff);
-                        }
+                    fullMsgWithUpdatedExchangeId.exchangeId(sndExchId);
 
-                        sndMsg = joinedNodeMsg;
-                    }
+                    return new T2<>(node, fullMsgWithUpdatedExchangeId);
                 }
-            }
-
-            try {
-                GridDhtPartitionExchangeId sndExchId = exchangeId();
 
-                if (mergedJoinExchMsgs != null) {
-                    GridDhtPartitionsSingleMessage mergedMsg = 
mergedJoinExchMsgs.get(node.id());
+                return new T2<>(node, fullMsgToSend);
+            })
+            .forEach(nodeAndMsg -> {
+                ClusterNode node = nodeAndMsg.get1();
+                GridDhtPartitionsFullMessage fullMsgToSend = nodeAndMsg.get2();
 
-                    if (mergedMsg != null)
-                        sndExchId = mergedMsg.exchangeId();
+                try {
+                    cctx.io().send(node, fullMsgToSend, SYSTEM_POOL);
                 }
-
-                if (sndExchId != null && !sndExchId.equals(exchangeId())) {
-                    sndMsg = sndMsg.copy();
-
-                    sndMsg.exchangeId(sndExchId);
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send partitions, node failed: " + 
node);
                 }
-
-                cctx.io().send(node, sndMsg, SYSTEM_POOL);
-            }
-            catch (ClusterTopologyCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partitions, node failed: " + 
node);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send partitions [node=" + node + ']', 
e);
-            }
-        }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send partitions [node=" + node + 
']', e);
+                }
+            });
     }
 
     /**
@@ -2213,6 +2232,34 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Tries to fast reply with {@link GridDhtPartitionsFullMessage} on 
received single message
+     * in case of exchange future has already completed.
+     *
+     * @param node Cluster node which sent single message.
+     * @param msg Single message.
+     * @return {@code true} if fast reply succeed.
+     */
+    public boolean fastReplyOnSingleMessage(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
+        GridDhtPartitionsExchangeFuture futToFastReply = this;
+
+        ExchangeLocalState currState;
+
+        synchronized (mux) {
+            currState = state;
+
+            if (currState == ExchangeLocalState.MERGED)
+                futToFastReply = mergedWith;
+        }
+
+        if (currState == ExchangeLocalState.DONE)
+            futToFastReply.processSingleMessage(node.id(), msg);
+        else if (currState == ExchangeLocalState.MERGED)
+            futToFastReply.processMergedMessage(node, msg);
+
+        return currState == ExchangeLocalState.MERGED || currState == 
ExchangeLocalState.DONE;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param msg Client's message.
      */
@@ -2931,9 +2978,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 synchronized (mux) {
                     srvNodes.remove(cctx.localNode());
 
-                    nodes = U.newHashSet(srvNodes.size());
-
-                    nodes.addAll(srvNodes);
+                    nodes = new LinkedHashSet<>(srvNodes);
 
                     mergedJoinExchMsgs0 = mergedJoinExchMsgs;
 
@@ -3094,48 +3139,50 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private void sendAllPartitionsToNode(FinishState finishState, 
GridDhtPartitionsSingleMessage msg, UUID nodeId) {
         ClusterNode node = cctx.node(nodeId);
 
-        if (node != null) {
-            GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy();
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partitions, node failed: " + nodeId);
 
-            Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+            return;
+        }
 
-            if (affReq != null) {
-                Map<Integer, CacheGroupAffinityMessage> aff = 
CacheGroupAffinityMessage.createAffinityMessages(
-                    cctx,
-                    finishState.resTopVer,
-                    affReq,
-                    null);
+        GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy();
 
-                fullMsg.joinedNodeAffinity(aff);
-            }
+        Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-            if (!fullMsg.exchangeId().equals(msg.exchangeId())) {
-                fullMsg = fullMsg.copy();
+        if (affReq != null) {
+            Map<Integer, CacheGroupAffinityMessage> aff = 
CacheGroupAffinityMessage.createAffinityMessages(
+                cctx,
+                finishState.resTopVer,
+                affReq,
+                null);
 
-                fullMsg.exchangeId(msg.exchangeId());
-            }
+            fullMsg.joinedNodeAffinity(aff);
+        }
 
-            try {
-                cctx.io().send(node, fullMsg, SYSTEM_POOL);
+        if (!fullMsg.exchangeId().equals(msg.exchangeId())) {
+            fullMsg = fullMsg.copy();
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Full message was sent to node: " +
-                        node +
-                        ", fullMsg: " + fullMsg
-                    );
-                }
-            }
-            catch (ClusterTopologyCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partitions, node failed: " + 
node);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send partitions [node=" + node + ']', 
e);
-            }
+            fullMsg.exchangeId(msg.exchangeId());
         }
-        else if (log.isDebugEnabled())
-            log.debug("Failed to send partitions, node failed: " + nodeId);
 
+        try {
+            cctx.io().send(node, fullMsg, SYSTEM_POOL);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Full message was sent to node: " +
+                    node +
+                    ", fullMsg: " + fullMsg
+                );
+            }
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partitions, node failed: " + node);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partitions [node=" + node + ']', e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 5962468..ab45d8b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -218,8 +218,10 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /**
      * @param joinedNodeAff Caches affinity for joining nodes.
      */
-    void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> 
joinedNodeAff) {
+    GridDhtPartitionsFullMessage joinedNodeAffinity(Map<Integer, 
CacheGroupAffinityMessage> joinedNodeAff) {
         this.joinedNodeAff = joinedNodeAff;
+
+        return this;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index ca31f05..c4f90e5 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -165,7 +165,7 @@ public class FilePageStore implements PageStore {
         try {
             ByteBuffer hdr = header(type, dbCfg.getPageSize());
 
-        fileIO.writeFully(hdr);
+            fileIO.writeFully(hdr);
 
             //there is 'super' page in every file
             return headerSize() + dbCfg.getPageSize();

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
new file mode 100644
index 0000000..a2adcf7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Advanced coordinator failure scenarios during PME.
+ */
+public class PartitionsExchangeCoordinatorFailoverTest extends 
GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        IgnitePredicate<ClusterNode> nodeFilter = node -> 
node.consistentId().equals(igniteInstanceName);
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration("cache-" + igniteInstanceName)
+                .setBackups(1)
+                .setNodeFilter(nodeFilter)
+                .setAffinity(new RendezvousAffinityFunction(false, 32))
+        );
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60 * 1000L;
+    }
+
+    /**
+     * Tests that new coordinator is able to finish old exchanges in case of 
in-complete coordinator initialization.
+     */
+    public void testNewCoordinatorCompletedExchange() throws Exception {
+        IgniteEx crd = (IgniteEx) startGrid("crd");
+
+        IgniteEx newCrd = startGrid(1);
+
+        crd.cluster().active(true);
+
+        // 3 node join topology version.
+        AffinityTopologyVersion joinThirdNodeVer = new 
AffinityTopologyVersion(3, 0);
+
+        // 4 node join topology version.
+        AffinityTopologyVersion joinFourNodeVer = new 
AffinityTopologyVersion(4, 0);
+
+        // Block FullMessage for newly joined nodes.
+        TestRecordingCommunicationSpi spi = 
TestRecordingCommunicationSpi.spi(crd);
+
+        final CountDownLatch sendFullMsgLatch = new CountDownLatch(1);
+
+        // Delay sending full message to newly joined nodes.
+        spi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionsFullMessage && node.order() > 
2) {
+                try {
+                    sendFullMsgLatch.await();
+                }
+                catch (Throwable ignored) { }
+
+                return true;
+            }
+
+            return false;
+        });
+
+        IgniteInternalFuture joinTwoNodesFut = GridTestUtils.runAsync(() -> 
startGridsMultiThreaded(2, 2));
+
+        GridCachePartitionExchangeManager exchangeMgr = 
newCrd.context().cache().context().exchange();
+
+        // Wait till new coordinator finishes third node join exchange.
+        GridTestUtils.waitForCondition(
+            () -> 
exchangeMgr.readyAffinityVersion().compareTo(joinThirdNodeVer) >= 0,
+            getTestTimeout()
+        );
+
+        IgniteInternalFuture startLastNodeFut = GridTestUtils.runAsync(() -> 
startGrid(5));
+
+        // Wait till new coordinator starts third node join exchange.
+        GridTestUtils.waitForCondition(
+            () -> 
exchangeMgr.lastTopologyFuture().initialVersion().compareTo(joinFourNodeVer) >= 
0,
+            getTestTimeout()
+        );
+
+        IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> 
stopGrid("crd", true, false));
+
+        // Magic sleep to make sure that coordinator stop process has started.
+        U.sleep(1000);
+
+        // Resume full messages sending to unblock coordinator stopping 
process.
+        sendFullMsgLatch.countDown();
+
+        // Coordinator stop should succeed.
+        stopCrdFut.get();
+
+        // Nodes join should succeed.
+        joinTwoNodesFut.get();
+
+        startLastNodeFut.get();
+
+        awaitPartitionMapExchange();
+
+        // Check that all caches are operable.
+        for (Ignite grid : G.allGrids()) {
+            IgniteCache cache = grid.cache("cache-" + 
grid.cluster().localNode().consistentId());
+
+            Assert.assertNotNull(cache);
+
+            cache.put(0, 0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index b255066..1183634 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -634,8 +634,6 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-9227";);
-
         cfgCache = false;
 
         final Ignite srv0 = startGrids(2);

http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index b52e339..f9e6b81 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import 
org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest;
+import 
org.apache.ignite.internal.processors.cache.PartitionsExchangeCoordinatorFailoverTest;
 import 
org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
 import 
org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
@@ -109,10 +110,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
 
         
suite.addTestSuite(IgniteExchangeLatchManagerCoordinatorFailTest.class);
 
+        suite.addTestSuite(PartitionsExchangeCoordinatorFailoverTest.class);
         suite.addTestSuite(CacheTryLockMultithreadedTest.class);
 
         //suite.addTestSuite(CacheClientsConcurrentStartTest.class);
-        //suite.addTestSuite(CacheTryLockMultithreadedTest.class);
         //suite.addTestSuite(GridCacheRebalancingOrderingTest.class);
         
//suite.addTestSuite(IgniteCacheClientMultiNodeUpdateTopologyLockTest.class);
 

Reply via email to