Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 3db3266c2 -> 63822921f


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/63822921
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/63822921
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/63822921

Branch: refs/heads/ignite-5578
Commit: 63822921f78fcd7af1690ab68cf7a5de418a97bc
Parents: 3db3266
Author: sboikov <[email protected]>
Authored: Thu Jul 20 16:07:34 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jul 20 16:07:34 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 551 ++++++++++++-------
 2 files changed, 353 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/63822921/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 81e83c9..93b1729 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1431,7 +1431,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 null,
                 null);
 
-            exchFut.processSinglePartitionRequest(node, msg);
+            exchFut.onReceivePartitionRequest(node, msg);
         }
         finally {
             leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/63822921/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 17bea14..696dd2c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -142,9 +142,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** */
     private ClusterNode crd;
 
-    /** */
-    private boolean crdReady;
-
     /** ExchangeFuture id. */
     private final GridDhtPartitionExchangeId exchId;
 
@@ -236,6 +233,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private final AtomicBoolean done = new AtomicBoolean();
 
     /** */
+    private ExchangeLocalState state;
+
+    /** */
     @GridToStringExclude
     private ExchangeContext exchCtx;
 
@@ -484,8 +484,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
+            assert state == null : state;
+
             if (crdNode)
-                crdReady = true;
+                state = ExchangeLocalState.CRD;
+            else
+                state = cctx.kernalContext().clientNode() ? 
ExchangeLocalState.CLIENT : ExchangeLocalState.SRV;
 
             exchLog.info("Started exchange init [topVer=" + topVer +
                 ", crd=" + crdNode +
@@ -1269,22 +1273,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param joinedNodeAff Affinity if was requested by some nodes.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes,
-        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff,
-        Map<Integer, CacheGroupAffinityMessage> idealAffDiff,
-        @Nullable GridDhtPartitionExchangeId msgExchId)
+    private void sendAllPartitions(
+        GridDhtPartitionsFullMessage msg,
+        Collection<ClusterNode> nodes,
+        Map<Integer, CacheGroupAffinityMessage> joinedNodeAff)
         throws IgniteCheckedException {
         boolean singleNode = nodes.size() == 1;
 
-        GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
-
-        if (exchCtx.mergeExchanges()) {
-            msg.resultTopologyVersion(exchCtx.events().topologyVersion());
-
-            if (exchCtx.events().serverLeft())
-                msg.idealAffinityDiff(idealAffDiff);
-        }
-
         GridDhtPartitionsFullMessage joinedNodeMsg = null;
 
         assert !nodes.contains(cctx.localNode());
@@ -1294,8 +1289,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 ", exchId=" + exchId + ", msg=" + msg + ']');
         }
 
-        msg.prepareMarshal(cctx);
-
         for (ClusterNode node : nodes) {
             GridDhtPartitionsFullMessage sndMsg = msg;
 
@@ -1318,9 +1311,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
 
             try {
-                GridDhtPartitionExchangeId sndExchId = msgExchId;
+                GridDhtPartitionExchangeId sndExchId = exchangeId();
 
-                if (sndExchId == null && mergedJoinExchMsgs != null) {
+                if (mergedJoinExchMsgs != null) {
                     GridDhtPartitionsSingleMessage mergedMsg = 
mergedJoinExchMsgs.get(node.id());
 
                     if (mergedMsg != null)
@@ -1533,32 +1526,44 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** */
     private GridDhtPartitionsSingleMessage pendingJoinMsg;
 
-    private boolean addMergedJoinExchange(UUID nodeId, 
GridDhtPartitionsSingleMessage msg) {
-        if (mergedJoinExchMsgs == null)
-            mergedJoinExchMsgs = new LinkedHashMap<>();
+    private boolean addMergedJoinExchange(ClusterNode node, @Nullable 
GridDhtPartitionsSingleMessage msg) {
+        assert Thread.holdsLock(this);
+        assert node != null;
+        assert state == ExchangeLocalState.CRD : state;
 
-        boolean wait = false;
+        UUID nodeId = node.id();
 
-        if (msg != null) {
-            log.info("Merge server join exchange, message received [curFut=" + 
topologyVersion() +
-                ", node=" + nodeId + ']');
+        boolean wait = false;
 
-            mergedJoinExchMsgs.put(nodeId, msg);
+        if (CU.clientNode(node)) {
+            if (msg != null)
+                waitAndReplyToClient(nodeId, msg);
         }
         else {
-            if (cctx.discovery().alive(nodeId)) {
-                log.info("Merge server join exchange, wait for message 
[curFut=" + topologyVersion() +
-                    ", node=" + nodeId + ']');
+            if (mergedJoinExchMsgs == null)
+                mergedJoinExchMsgs = new LinkedHashMap<>();
 
-                wait = true;
+            if (msg != null) {
+                log.info("Merge server join exchange, message received 
[curFut=" + topologyVersion() +
+                    ", node=" + nodeId + ']');
 
-                awaitMergedMsgs++;
+                mergedJoinExchMsgs.put(nodeId, msg);
             }
             else {
-                log.info("Merge server join exchange, awaited node left 
[curFut=" + topologyVersion() +
-                    ", node=" + nodeId + ']');
+                if (cctx.discovery().alive(nodeId)) {
+                    log.info("Merge server join exchange, wait for message 
[curFut=" + topologyVersion() +
+                        ", node=" + nodeId + ']');
+
+                    wait = true;
 
-                mergedJoinExchMsgs.put(nodeId, null);
+                    mergedJoinExchMsgs.put(nodeId, null);
+
+                    awaitMergedMsgs++;
+                }
+                else {
+                    log.info("Merge server join exchange, awaited node left 
[curFut=" + topologyVersion() +
+                        ", node=" + nodeId + ']');
+                }
             }
         }
 
@@ -1576,23 +1581,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             assert !isDone();
             assert !initFut.isDone();
             assert mergedWith == null;
+            assert state == null;
+
+            state = ExchangeLocalState.MERGED;
 
             mergedWith = fut;
 
             ClusterNode joinedNode = discoEvt.eventNode();
 
-            if (CU.clientNode(joinedNode)) {
-                if (pendingJoinMsg != null) {
-                    if (mergedJoinExchMsgs == null)
-                        mergedJoinExchMsgs = new LinkedHashMap<>();
-
-                    mergedJoinExchMsgs.put(joinedNode.id(), pendingJoinMsg);
-                }
-
-                wait = false;
-            }
-            else
-                wait = fut.addMergedJoinExchange(joinedNode.id(), 
pendingJoinMsg);
+            wait = fut.addMergedJoinExchange(joinedNode, pendingJoinMsg);
         }
 
         return wait;
@@ -1602,13 +1599,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param node
      * @param msg
      */
-    void onReceiveMerged(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
+    void processMergedMessage(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
         if (msg.client()) {
-            listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    sendAllPartitions(msg, node.id(), 0);
-                }
-            });
+            waitAndReplyToClient(node.id(), msg);
 
             return;
         }
@@ -1616,7 +1609,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         boolean done = false;
 
         synchronized (this) {
-            boolean process = mergedJoinExchMsgs != null && 
!mergedJoinExchMsgs.containsKey(node.id());
+            boolean process = mergedJoinExchMsgs != null &&
+                mergedJoinExchMsgs.containsKey(node.id()) &&
+                mergedJoinExchMsgs.get(node.id()) == null;
 
             log.info("Merge server join exchange, received message [curFut=" + 
topologyVersion() +
                 ", node=" + node.id() +
@@ -1644,68 +1639,82 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param msg Single partition info.
      */
     public void onReceiveSingleMessage(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
+        assert !node.isDaemon() : node;
         assert msg != null;
         assert exchId.equals(msg.exchangeId()) : msg;
+        assert !cctx.kernalContext().clientNode();
 
-        if (isDone()) {
-            if (log.isDebugEnabled())
-                log.debug("Received message for finished future (will reply 
only to sender) [msg=" + msg +
-                    ", fut=" + this + ']');
+        if (!msg.client()) {
+            assert msg.lastVersion() != null : msg;
 
-            if (!centralizedAff)
-                sendAllPartitions(msg, node.id(), 
cctx.gridConfig().getNetworkSendRetryCount());
+            updateLastVersion(msg.lastVersion());
         }
-        else {
-            if (!msg.client()) {
-                assert msg.lastVersion() != null : msg;
 
-                updateLastVersion(msg.lastVersion());
+        GridDhtPartitionsExchangeFuture mergedWith0 = null;
+
+        synchronized (this) {
+            if (state == ExchangeLocalState.MERGED) {
+                assert mergedWith != null;
+
+                mergedWith0 = mergedWith;
             }
+            else {
+                assert state != ExchangeLocalState.CLIENT;
 
-            GridDhtPartitionsExchangeFuture mergedWith0 = null;
+                if (exchangeId().isJoined() && 
node.id().equals(exchId.nodeId()))
+                    pendingJoinMsg = msg;
+            }
+        }
 
-            synchronized (this) {
-                if (mergedWith != null)
-                    mergedWith0 = mergedWith;
-                else {
-                    if (exchangeId().isJoined() && 
node.id().equals(exchId.nodeId()))
-                        pendingJoinMsg = msg;
+        if (mergedWith0 != null) {
+            mergedWith0.processMergedMessage(node, msg);
+
+            return;
+        }
+
+        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+            @Override public void apply(IgniteInternalFuture<Boolean> f) {
+                try {
+                    if (!f.get())
+                        return;
                 }
-            }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to initialize exchange future: " + 
this, e);
 
-            if (mergedWith0 != null) {
-                mergedWith0.onReceiveMerged(node, msg);
+                    return;
+                }
 
-                return;
+                processSingleMessage(node.id(), msg);
             }
+        });
+    }
 
-            if (msg.client()) {
-                listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        if (!centralizedAff)
-                            sendAllPartitions(msg, node.id(), 
cctx.gridConfig().getNetworkSendRetryCount());
-                    }
-                });
+    /**
+     * @param nodeId Node ID.
+     * @param msg
+     */
+    private void waitAndReplyToClient(final UUID nodeId, final 
GridDhtPartitionsSingleMessage msg) {
+        assert msg.client();
 
-                return;
-            }
+        listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                FinishState finishState0;
 
-            initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                @Override public void apply(IgniteInternalFuture<Boolean> f) {
-                    try {
-                        if (!f.get())
-                            return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to initialize exchange future: " 
+ this, e);
+                synchronized (GridDhtPartitionsExchangeFuture.this) {
+                    finishState0 = finishState;
+                }
 
-                        return;
-                    }
+                if (finishState0 == null) {
+                    assert discoEvt.type() == EVT_NODE_JOINED && 
CU.clientNode(discoEvt.eventNode()) : discoEvt;
 
-                    processSingleMessage(node.id(), msg);
+                    finishState0 = new FinishState(cctx.localNodeId(),
+                        initialVersion(),
+                        createPartitionsMessage(false));
                 }
-            });
-        }
+
+                sendAllPartitionsToNode(finishState0, msg, nodeId);
+            }
+        });
     }
 
     /**
@@ -1713,26 +1722,62 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param msg Message.
      */
     private void processSingleMessage(UUID nodeId, 
GridDhtPartitionsSingleMessage msg) {
+        if (msg.client()) {
+            waitAndReplyToClient(nodeId, msg);
+
+            return;
+        }
+
         boolean allReceived = false;
         boolean updateSingleMap = false;
 
+        FinishState finishState0 = null;
+
         synchronized (this) {
             assert crd != null;
 
-            if (crd.isLocal() && crdReady) {
-                if (remaining.remove(nodeId)) {
-                    updateSingleMap = true;
+            switch (state) {
+                case DONE: {
+                    assert finishState != null;
 
-                    pendingSingleUpdates++;
+                    finishState0 = finishState;
 
-                    if (stateChangeExchange() && msg.getError() != null)
-                        changeGlobalStateExceptions.put(nodeId, 
msg.getError());
+                    break;
+                }
+
+                case CRD: {
+                    assert crd.isLocal() : crd;
+
+                    if (remaining.remove(nodeId)) {
+                        updateSingleMap = true;
+
+                        pendingSingleUpdates++;
+
+                        if (stateChangeExchange() && msg.getError() != null)
+                            changeGlobalStateExceptions.put(nodeId, 
msg.getError());
+
+                        allReceived = remaining.isEmpty();
+                    }
 
-                    allReceived = remaining.isEmpty();
+                    break;
                 }
+
+                case SRV:
+                case BECOME_CRD: {
+                    singleMsgs.put(nodeId, msg);
+
+                    break;
+                }
+
+                default:
+                    assert false : state;
             }
-            else
-                singleMsgs.put(nodeId, msg);
+        }
+
+        if (finishState0 != null) {
+            sendAllPartitionsToNode(finishState0, msg, nodeId);
+
+            return;
         }
 
         if (updateSingleMap) {
@@ -1754,22 +1799,27 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
 
         if (allReceived) {
-            awaitSingleMapUpdates();
+            if (!awaitSingleMapUpdates())
+                return;
 
             onAllReceived();
         }
     }
 
     /**
-     *
+     * @return {@code False} if interrupted.
      */
-    private synchronized void awaitSingleMapUpdates() {
+    private synchronized boolean awaitSingleMapUpdates() {
         try {
             while (pendingSingleUpdates > 0)
                 U.wait(this);
+
+            return true;
         }
         catch (IgniteInterruptedCheckedException e) {
             U.warn(log, "Failed to wait for partition map updates, thread was 
interrupted: " + e);
+
+            return false;
         }
     }
 
@@ -1988,7 +2038,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         try {
             assert crd.isLocal();
 
-            assert partHistSuppliers.isEmpty();
+            assert partHistSuppliers.isEmpty() : partHistSuppliers;
 
             if (!crd.equals(discoCache.serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
@@ -2019,7 +2069,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private void finishExchangeOnCoordinator() {
         try {
-            log.info("finishExchangeOnCoordinator [topVer=" + 
topologyVersion() + ", resVer=" + exchCtx.events().topologyVersion() + ']');
+            log.info("finishExchangeOnCoordinator [topVer=" + 
topologyVersion() +
+                ", resVer=" + exchCtx.events().topologyVersion() + ']');
 
             AffinityTopologyVersion resTopVer = 
exchCtx.events().topologyVersion();
 
@@ -2044,8 +2095,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             synchronized (this) {
                 if (mergedJoinExchMsgs != null) {
                     for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
mergedJoinExchMsgs.entrySet()) {
-                        if (e.getValue() != null)
-                            msgs.put(e.getKey(), e.getValue());
+                        msgs.put(e.getKey(), e.getValue());
+
+                        updatePartitionSingleMap(e.getKey(), e.getValue());
                     }
                 }
             }
@@ -2137,6 +2189,25 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             cctx.versions().onExchange(lastVer.get().order());
 
+            GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+
+            if (exchCtx.mergeExchanges()) {
+                assert !centralizedAff;
+
+                msg.resultTopologyVersion(exchCtx.events().topologyVersion());
+
+                if (exchCtx.events().serverLeft())
+                    msg.idealAffinityDiff(idealAffDiff);
+            }
+
+            msg.prepareMarshal(cctx);
+
+            synchronized (this) {
+                finishState = new FinishState(crd.id(), 
exchCtx.events().topologyVersion(), msg);
+
+                state = ExchangeLocalState.DONE;
+            }
+
             if (centralizedAff) {
                 assert !exchCtx.mergeExchanges();
 
@@ -2191,15 +2262,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                     boolean active = !stateChangeErr && req.activate();
 
-                    ChangeGlobalStateFinishMessage msg = new 
ChangeGlobalStateFinishMessage(req.requestId(), active);
+                    ChangeGlobalStateFinishMessage stateFinishMsg = new 
ChangeGlobalStateFinishMessage(
+                        req.requestId(),
+                        active);
 
-                    cctx.discovery().sendCustomEvent(msg);
+                    cctx.discovery().sendCustomEvent(stateFinishMsg);
                 }
 
                 if (!nodes.isEmpty())
-                    sendAllPartitions(nodes, joinedNodeAff, idealAffDiff, 
null);
+                    sendAllPartitions(msg, nodes, joinedNodeAff);
 
                 onDone(exchCtx.events().topologyVersion(), err);
+
+                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
singleMsgs.entrySet())
+                    processSingleMessage(e.getKey(), e.getValue());
             }
         }
         catch (IgniteCheckedException e) {
@@ -2227,58 +2303,40 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * @param msg Request.
      * @param nodeId Node ID.
-     * @param retryCnt Number of retries.
      */
-    private void sendAllPartitions(final GridDhtPartitionsSingleMessage msg, 
final UUID nodeId, final int retryCnt) {
-        ClusterNode n = cctx.node(nodeId);
+    private void sendAllPartitionsToNode(FinishState finishState, 
GridDhtPartitionsSingleMessage msg, UUID nodeId) {
+        ClusterNode node = cctx.node(nodeId);
 
-        try {
-            if (n != null) {
-                Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+        if (node != null) {
+            GridDhtPartitionsFullMessage fullMsg = finishState.msg;
 
-                Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
+            fullMsg = fullMsg.copy();
 
-                if (affReq != null) {
-                    joinedNodeAff = 
CacheGroupAffinityMessage.createAffinityMessages(
-                        cctx,
-                        msg.exchangeId().topologyVersion(),
-                        affReq,
-                        null);
-                }
+            Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-                // TODO IGNITE-5578.
-                sendAllPartitions(F.asList(n), joinedNodeAff, null, 
msg.exchangeId());
-            }
-        }
-        catch (IgniteCheckedException e) {
-            if (e instanceof ClusterTopologyCheckedException || 
!cctx.discovery().alive(n)) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send full partition map to node, node 
left grid " +
-                        "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+            if (affReq != null) {
+                Map<Integer, CacheGroupAffinityMessage> aff = 
CacheGroupAffinityMessage.createAffinityMessages(
+                    cctx,
+                    finishState.resTopVer,
+                    affReq,
+                    null);
 
-                return;
+                fullMsg.joinedNodeAffinity(aff);
             }
 
-            if (reconnectOnError(e)) {
-                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            if (!fullMsg.exchangeId().equals(msg.exchangeId()))
+                fullMsg.exchangeId(msg.exchangeId());
 
-                return;
+            try {
+                cctx.io().send(node, fullMsg, SYSTEM_POOL);
             }
-
-            if (retryCnt > 0) {
-                long timeout = cctx.gridConfig().getNetworkSendRetryDelay();
-
-                LT.error(log, e, "Failed to send full partition map to node 
(will retry after timeout) " +
-                    "[node=" + nodeId + ", exchangeId=" + exchId + ", 
timeout=" + timeout + ']');
-
-                cctx.time().addTimeoutObject(new 
GridTimeoutObjectAdapter(timeout) {
-                    @Override public void onTimeout() {
-                        sendAllPartitions(msg, nodeId, retryCnt - 1);
-                    }
-                });
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send partitions, node failed: " + 
node);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send partitions [node=" + node + ']', 
e);
             }
-            else
-                U.error(log, "Failed to send full partition map [node=" + n + 
", exchangeId=" + exchId + ']', e);
         }
     }
 
@@ -2288,18 +2346,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     public void onReceiveFullMessage(final ClusterNode node, final 
GridDhtPartitionsFullMessage msg) {
         assert msg != null;
-
-        final UUID nodeId = node.id();
-
-        if (isDone()) {
-            if (log.isDebugEnabled())
-                log.debug("Received message for finished future [msg=" + msg + 
", fut=" + this + ']');
-
-            return;
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Received full partition map from node [nodeId=" + 
nodeId + ", msg=" + msg + ']');
+        assert msg.exchangeId() != null : msg;
+        assert !node.isDaemon() : node;
 
         initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
             @Override public void apply(IgniteInternalFuture<Boolean> f) {
@@ -2322,25 +2370,62 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param node Sender node.
      * @param msg Message.
      */
-    public void processSinglePartitionRequest(final ClusterNode node, 
GridDhtPartitionsSingleRequest msg) {
+    public void onReceivePartitionRequest(final ClusterNode node, final 
GridDhtPartitionsSingleRequest msg) {
+        assert !cctx.kernalContext().clientNode();
+        assert !node.isDaemon() : node;
+
         if (!cctx.discovery().alive(node.id()))
             return;
 
         initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
             @Override public void apply(IgniteInternalFuture<Boolean> fut) {
-                synchronized (this) {
-                    if (finishState != null && 
node.id().equals(finishState.crdId))
+                processSinglePartitionRequest(node, msg);
+            }
+        });
+    }
+
+    /**
+     * @param node Sender node.
+     * @param msg Message.
+     */
+    private void processSinglePartitionRequest(ClusterNode node, 
GridDhtPartitionsSingleRequest msg) {
+        synchronized (this) {
+            switch (state) {
+                case DONE: {
+                    assert finishState != null;
+
+                    if (node.id().equals(finishState.crdId)) {
+                        log.info("Ignore partitions request, finished exchange 
with this coordinator: " + msg);
+
                         return;
+                    }
+
+                    break;
                 }
 
-                try {
-                    sendLocalPartitions(node);
+                case CRD:
+                case BECOME_CRD: {
+                    log.info("Ignore partitions request, node is coordinator: 
" + msg);
+
+                    return;
                 }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send message to coordinator: " + 
e);
+
+                case SRV: {
+
+                    break;
                 }
+
             }
-        });
+        }
+
+        // TODO 5578, backward compatibility, send state if available.
+
+        try {
+            sendLocalPartitions(node);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send message to coordinator: " + e);
+        }
     }
 
     /**
@@ -2353,21 +2438,49 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             assert msg.lastVersion() != null : msg;
 
             synchronized (this) {
-                if (crd == null || finishState != null)
+                if (crd == null) {
+                    log.info("Ignore full message, all server nodes left: " + 
msg);
+
                     return;
+                }
 
-                if (!crd.equals(node)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received full partition map from unexpected 
node [oldest=" + crd.id() +
-                            ", nodeId=" + node.id() + ']');
+                switch (state) {
+                    case CRD:
+                    case BECOME_CRD: {
+                        log.info("Ignore full message, node is coordinator: " 
+ msg);
 
-                    if (node.order() > crd.order())
-                        fullMsgs.put(node, msg);
+                        return;
+                    }
 
-                    return;
-                }
+                    case DONE: {
+                        log.info("Ignore full message, future is done: " + 
msg);
+
+                        return;
+                    }
+
+                    case SRV:
+                    case CLIENT: {
+                        if (!crd.equals(node)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received full partition map from 
unexpected node [oldest=" + crd.id() +
+                                    ", nodeId=" + node.id() + ']');
+
+                            if (node.order() > crd.order())
+                                fullMsgs.put(node, msg);
 
-                finishState = new FinishState(crd.id(), 
msg.resultTopologyVersion());
+                            return;
+                        }
+                        else {
+                            finishState = new FinishState(crd.id(),
+                                msg.resultTopologyVersion() != null ? 
msg.resultTopologyVersion() : initialVersion(),
+                                msg);
+
+                            state = ExchangeLocalState.DONE;
+
+                            break;
+                        }
+                    }
+                }
             }
 
             if (exchCtx.mergeExchanges()) {
@@ -2593,18 +2706,42 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                             boolean rmvd = remaining.remove(node.id());
 
+                            if (!rmvd) {
+                                if (mergedJoinExchMsgs != null && 
mergedJoinExchMsgs.containsKey(node.id())) {
+                                    if (mergedJoinExchMsgs.get(node.id()) == 
null) {
+                                        mergedJoinExchMsgs.remove(node.id());
+
+                                        rmvd = true;
+                                    }
+                                }
+                            }
+
                             if (node.equals(crd)) {
                                 crdChanged = true;
 
                                 crd = !srvNodes.isEmpty() ? srvNodes.get(0) : 
null;
                             }
 
-                            if (crd != null && crd.isLocal()) {
-                                if (!crdChanged && crdReady && rmvd)
-                                    allReceived = remaining.isEmpty();
+                            switch (state) {
+                                case CRD:
+                                    allReceived = rmvd && (remaining.isEmpty() 
&& F.isEmpty(mergedJoinExchMsgs));
+
+                                    break;
+
+                                case SRV:
+                                    assert crd != null;
+
+                                    if (crdChanged && crd.isLocal())
+                                        state = ExchangeLocalState.BECOME_CRD;
+
+                                    break;
                             }
 
                             crd0 = crd;
+
+                            if (crd0 == null) {
+                                finishState = new FinishState(null, 
initialVersion(), null);
+                            }
                         }
 
                         if (crd0 == null) {
@@ -2657,10 +2794,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         }
                         else {
                             if (crdChanged) {
-                                sendPartitions(crd0);
-
                                 for (Map.Entry<ClusterNode, 
GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
                                     processFullMessage(m.getKey(), 
m.getValue());
+
+                                if (!isDone())
+                                    sendPartitions(crd0);
                             }
                         }
                     }
@@ -2689,9 +2827,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         synchronized (this) {
             assert crd != null && crd.isLocal();
-            assert !crdReady;
 
-            crdReady = true;
+            state = ExchangeLocalState.CRD;
 
             if (!remaining.isEmpty())
                 remaining0 = new HashSet<>(remaining);
@@ -2882,14 +3019,30 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         private final UUID crdId;
 
         /** */
-        private final AffinityTopologyVersion topVer;
+        private final AffinityTopologyVersion resTopVer;
+
+        /** */
+        private final GridDhtPartitionsFullMessage msg;
 
         /**
          * @param crdId Coordinator node.
          */
-        FinishState(UUID crdId, AffinityTopologyVersion topVer) {
+        FinishState(UUID crdId, AffinityTopologyVersion resTopVer, 
GridDhtPartitionsFullMessage msg) {
             this.crdId = crdId;
-            this.topVer = topVer;
+            this.resTopVer = resTopVer;
+            this.msg = msg;
         }
     }
+
+    /**
+     *
+     */
+    enum ExchangeLocalState {
+        CRD,
+        SRV,
+        CLIENT,
+        BECOME_CRD,
+        DONE,
+        MERGED
+    }
 }

Reply via email to