Repository: ignite Updated Branches: refs/heads/ignite-5578 3db3266c2 -> 63822921f
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/63822921 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/63822921 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/63822921 Branch: refs/heads/ignite-5578 Commit: 63822921f78fcd7af1690ab68cf7a5de418a97bc Parents: 3db3266 Author: sboikov <[email protected]> Authored: Thu Jul 20 16:07:34 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jul 20 16:07:34 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 551 ++++++++++++------- 2 files changed, 353 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/63822921/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 81e83c9..93b1729 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1431,7 +1431,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana null, null); - exchFut.processSinglePartitionRequest(node, msg); + exchFut.onReceivePartitionRequest(node, msg); } finally { leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/63822921/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 17bea14..696dd2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -142,9 +142,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private ClusterNode crd; - /** */ - private boolean crdReady; - /** ExchangeFuture id. */ private final GridDhtPartitionExchangeId exchId; @@ -236,6 +233,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private final AtomicBoolean done = new AtomicBoolean(); /** */ + private ExchangeLocalState state; + + /** */ @GridToStringExclude private ExchangeContext exchCtx; @@ -484,8 +484,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); + assert state == null : state; + if (crdNode) - crdReady = true; + state = ExchangeLocalState.CRD; + else + state = cctx.kernalContext().clientNode() ? ExchangeLocalState.CLIENT : ExchangeLocalState.SRV; exchLog.info("Started exchange init [topVer=" + topVer + ", crd=" + crdNode + @@ -1269,22 +1273,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param joinedNodeAff Affinity if was requested by some nodes. * @throws IgniteCheckedException If failed. */ - private void sendAllPartitions(Collection<ClusterNode> nodes, - Map<Integer, CacheGroupAffinityMessage> joinedNodeAff, - Map<Integer, CacheGroupAffinityMessage> idealAffDiff, - @Nullable GridDhtPartitionExchangeId msgExchId) + private void sendAllPartitions( + GridDhtPartitionsFullMessage msg, + Collection<ClusterNode> nodes, + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) throws IgniteCheckedException { boolean singleNode = nodes.size() == 1; - GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); - - if (exchCtx.mergeExchanges()) { - msg.resultTopologyVersion(exchCtx.events().topologyVersion()); - - if (exchCtx.events().serverLeft()) - msg.idealAffinityDiff(idealAffDiff); - } - GridDhtPartitionsFullMessage joinedNodeMsg = null; assert !nodes.contains(cctx.localNode()); @@ -1294,8 +1289,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ", exchId=" + exchId + ", msg=" + msg + ']'); } - msg.prepareMarshal(cctx); - for (ClusterNode node : nodes) { GridDhtPartitionsFullMessage sndMsg = msg; @@ -1318,9 +1311,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } try { - GridDhtPartitionExchangeId sndExchId = msgExchId; + GridDhtPartitionExchangeId sndExchId = exchangeId(); - if (sndExchId == null && mergedJoinExchMsgs != null) { + if (mergedJoinExchMsgs != null) { GridDhtPartitionsSingleMessage mergedMsg = mergedJoinExchMsgs.get(node.id()); if (mergedMsg != null) @@ -1533,32 +1526,44 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private GridDhtPartitionsSingleMessage pendingJoinMsg; - private boolean addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) { - if (mergedJoinExchMsgs == null) - mergedJoinExchMsgs = new LinkedHashMap<>(); + private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartitionsSingleMessage msg) { + assert Thread.holdsLock(this); + assert node != null; + assert state == ExchangeLocalState.CRD : state; - boolean wait = false; + UUID nodeId = node.id(); - if (msg != null) { - log.info("Merge server join exchange, message received [curFut=" + topologyVersion() + - ", node=" + nodeId + ']'); + boolean wait = false; - mergedJoinExchMsgs.put(nodeId, msg); + if (CU.clientNode(node)) { + if (msg != null) + waitAndReplyToClient(nodeId, msg); } else { - if (cctx.discovery().alive(nodeId)) { - log.info("Merge server join exchange, wait for message [curFut=" + topologyVersion() + - ", node=" + nodeId + ']'); + if (mergedJoinExchMsgs == null) + mergedJoinExchMsgs = new LinkedHashMap<>(); - wait = true; + if (msg != null) { + log.info("Merge server join exchange, message received [curFut=" + topologyVersion() + + ", node=" + nodeId + ']'); - awaitMergedMsgs++; + mergedJoinExchMsgs.put(nodeId, msg); } else { - log.info("Merge server join exchange, awaited node left [curFut=" + topologyVersion() + - ", node=" + nodeId + ']'); + if (cctx.discovery().alive(nodeId)) { + log.info("Merge server join exchange, wait for message [curFut=" + topologyVersion() + + ", node=" + nodeId + ']'); + + wait = true; - mergedJoinExchMsgs.put(nodeId, null); + mergedJoinExchMsgs.put(nodeId, null); + + awaitMergedMsgs++; + } + else { + log.info("Merge server join exchange, awaited node left [curFut=" + topologyVersion() + + ", node=" + nodeId + ']'); + } } } @@ -1576,23 +1581,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert !isDone(); assert !initFut.isDone(); assert mergedWith == null; + assert state == null; + + state = ExchangeLocalState.MERGED; mergedWith = fut; ClusterNode joinedNode = discoEvt.eventNode(); - if (CU.clientNode(joinedNode)) { - if (pendingJoinMsg != null) { - if (mergedJoinExchMsgs == null) - mergedJoinExchMsgs = new LinkedHashMap<>(); - - mergedJoinExchMsgs.put(joinedNode.id(), pendingJoinMsg); - } - - wait = false; - } - else - wait = fut.addMergedJoinExchange(joinedNode.id(), pendingJoinMsg); + wait = fut.addMergedJoinExchange(joinedNode, pendingJoinMsg); } return wait; @@ -1602,13 +1599,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param node * @param msg */ - void onReceiveMerged(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + void processMergedMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { if (msg.client()) { - listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - sendAllPartitions(msg, node.id(), 0); - } - }); + waitAndReplyToClient(node.id(), msg); return; } @@ -1616,7 +1609,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean done = false; synchronized (this) { - boolean process = mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id()); + boolean process = mergedJoinExchMsgs != null && + mergedJoinExchMsgs.containsKey(node.id()) && + mergedJoinExchMsgs.get(node.id()) == null; log.info("Merge server join exchange, received message [curFut=" + topologyVersion() + ", node=" + node.id() + @@ -1644,68 +1639,82 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param msg Single partition info. */ public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + assert !node.isDaemon() : node; assert msg != null; assert exchId.equals(msg.exchangeId()) : msg; + assert !cctx.kernalContext().clientNode(); - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Received message for finished future (will reply only to sender) [msg=" + msg + - ", fut=" + this + ']'); + if (!msg.client()) { + assert msg.lastVersion() != null : msg; - if (!centralizedAff) - sendAllPartitions(msg, node.id(), cctx.gridConfig().getNetworkSendRetryCount()); + updateLastVersion(msg.lastVersion()); } - else { - if (!msg.client()) { - assert msg.lastVersion() != null : msg; - updateLastVersion(msg.lastVersion()); + GridDhtPartitionsExchangeFuture mergedWith0 = null; + + synchronized (this) { + if (state == ExchangeLocalState.MERGED) { + assert mergedWith != null; + + mergedWith0 = mergedWith; } + else { + assert state != ExchangeLocalState.CLIENT; - GridDhtPartitionsExchangeFuture mergedWith0 = null; + if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) + pendingJoinMsg = msg; + } + } - synchronized (this) { - if (mergedWith != null) - mergedWith0 = mergedWith; - else { - if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) - pendingJoinMsg = msg; + if (mergedWith0 != null) { + mergedWith0.processMergedMessage(node, msg); + + return; + } + + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + if (!f.get()) + return; } - } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); - if (mergedWith0 != null) { - mergedWith0.onReceiveMerged(node, msg); + return; + } - return; + processSingleMessage(node.id(), msg); } + }); + } - if (msg.client()) { - listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - if (!centralizedAff) - sendAllPartitions(msg, node.id(), cctx.gridConfig().getNetworkSendRetryCount()); - } - }); + /** + * @param nodeId Node ID. + * @param msg + */ + private void waitAndReplyToClient(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { + assert msg.client(); - return; - } + listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + FinishState finishState0; - initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> f) { - try { - if (!f.get()) - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize exchange future: " + this, e); + synchronized (GridDhtPartitionsExchangeFuture.this) { + finishState0 = finishState; + } - return; - } + if (finishState0 == null) { + assert discoEvt.type() == EVT_NODE_JOINED && CU.clientNode(discoEvt.eventNode()) : discoEvt; - processSingleMessage(node.id(), msg); + finishState0 = new FinishState(cctx.localNodeId(), + initialVersion(), + createPartitionsMessage(false)); } - }); - } + + sendAllPartitionsToNode(finishState0, msg, nodeId); + } + }); } /** @@ -1713,26 +1722,62 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param msg Message. */ private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + if (msg.client()) { + waitAndReplyToClient(nodeId, msg); + + return; + } + boolean allReceived = false; boolean updateSingleMap = false; + FinishState finishState0 = null; + synchronized (this) { assert crd != null; - if (crd.isLocal() && crdReady) { - if (remaining.remove(nodeId)) { - updateSingleMap = true; + switch (state) { + case DONE: { + assert finishState != null; - pendingSingleUpdates++; + finishState0 = finishState; - if (stateChangeExchange() && msg.getError() != null) - changeGlobalStateExceptions.put(nodeId, msg.getError()); + break; + } + + case CRD: { + assert crd.isLocal() : crd; + + if (remaining.remove(nodeId)) { + updateSingleMap = true; + + pendingSingleUpdates++; + + if (stateChangeExchange() && msg.getError() != null) + changeGlobalStateExceptions.put(nodeId, msg.getError()); + + allReceived = remaining.isEmpty(); + } - allReceived = remaining.isEmpty(); + break; } + + case SRV: + case BECOME_CRD: { + singleMsgs.put(nodeId, msg); + + break; + } + + default: + assert false : state; } - else - singleMsgs.put(nodeId, msg); + } + + if (finishState0 != null) { + sendAllPartitionsToNode(finishState0, msg, nodeId); + + return; } if (updateSingleMap) { @@ -1754,22 +1799,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (allReceived) { - awaitSingleMapUpdates(); + if (!awaitSingleMapUpdates()) + return; onAllReceived(); } } /** - * + * @return {@code False} if interrupted. */ - private synchronized void awaitSingleMapUpdates() { + private synchronized boolean awaitSingleMapUpdates() { try { while (pendingSingleUpdates > 0) U.wait(this); + + return true; } catch (IgniteInterruptedCheckedException e) { U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); + + return false; } } @@ -1988,7 +2038,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { assert crd.isLocal(); - assert partHistSuppliers.isEmpty(); + assert partHistSuppliers.isEmpty() : partHistSuppliers; if (!crd.equals(discoCache.serverNodes().get(0))) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { @@ -2019,7 +2069,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void finishExchangeOnCoordinator() { try { - log.info("finishExchangeOnCoordinator [topVer=" + topologyVersion() + ", resVer=" + exchCtx.events().topologyVersion() + ']'); + log.info("finishExchangeOnCoordinator [topVer=" + topologyVersion() + + ", resVer=" + exchCtx.events().topologyVersion() + ']'); AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion(); @@ -2044,8 +2095,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte synchronized (this) { if (mergedJoinExchMsgs != null) { for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) { - if (e.getValue() != null) - msgs.put(e.getKey(), e.getValue()); + msgs.put(e.getKey(), e.getValue()); + + updatePartitionSingleMap(e.getKey(), e.getValue()); } } } @@ -2137,6 +2189,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.versions().onExchange(lastVer.get().order()); + GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + + if (exchCtx.mergeExchanges()) { + assert !centralizedAff; + + msg.resultTopologyVersion(exchCtx.events().topologyVersion()); + + if (exchCtx.events().serverLeft()) + msg.idealAffinityDiff(idealAffDiff); + } + + msg.prepareMarshal(cctx); + + synchronized (this) { + finishState = new FinishState(crd.id(), exchCtx.events().topologyVersion(), msg); + + state = ExchangeLocalState.DONE; + } + if (centralizedAff) { assert !exchCtx.mergeExchanges(); @@ -2191,15 +2262,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean active = !stateChangeErr && req.activate(); - ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(), active); + ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage( + req.requestId(), + active); - cctx.discovery().sendCustomEvent(msg); + cctx.discovery().sendCustomEvent(stateFinishMsg); } if (!nodes.isEmpty()) - sendAllPartitions(nodes, joinedNodeAff, idealAffDiff, null); + sendAllPartitions(msg, nodes, joinedNodeAff); onDone(exchCtx.events().topologyVersion(), err); + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : singleMsgs.entrySet()) + processSingleMessage(e.getKey(), e.getValue()); } } catch (IgniteCheckedException e) { @@ -2227,58 +2303,40 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param msg Request. * @param nodeId Node ID. - * @param retryCnt Number of retries. */ - private void sendAllPartitions(final GridDhtPartitionsSingleMessage msg, final UUID nodeId, final int retryCnt) { - ClusterNode n = cctx.node(nodeId); + private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage msg, UUID nodeId) { + ClusterNode node = cctx.node(nodeId); - try { - if (n != null) { - Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + if (node != null) { + GridDhtPartitionsFullMessage fullMsg = finishState.msg; - Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; + fullMsg = fullMsg.copy(); - if (affReq != null) { - joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages( - cctx, - msg.exchangeId().topologyVersion(), - affReq, - null); - } + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); - // TODO IGNITE-5578. - sendAllPartitions(F.asList(n), joinedNodeAff, null, msg.exchangeId()); - } - } - catch (IgniteCheckedException e) { - if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) { - if (log.isDebugEnabled()) - log.debug("Failed to send full partition map to node, node left grid " + - "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); + if (affReq != null) { + Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages( + cctx, + finishState.resTopVer, + affReq, + null); - return; + fullMsg.joinedNodeAffinity(aff); } - if (reconnectOnError(e)) { - onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + if (!fullMsg.exchangeId().equals(msg.exchangeId())) + fullMsg.exchangeId(msg.exchangeId()); - return; + try { + cctx.io().send(node, fullMsg, SYSTEM_POOL); } - - if (retryCnt > 0) { - long timeout = cctx.gridConfig().getNetworkSendRetryDelay(); - - LT.error(log, e, "Failed to send full partition map to node (will retry after timeout) " + - "[node=" + nodeId + ", exchangeId=" + exchId + ", timeout=" + timeout + ']'); - - cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(timeout) { - @Override public void onTimeout() { - sendAllPartitions(msg, nodeId, retryCnt - 1); - } - }); + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions [node=" + node + ']', e); } - else - U.error(log, "Failed to send full partition map [node=" + n + ", exchangeId=" + exchId + ']', e); } } @@ -2288,18 +2346,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitionsFullMessage msg) { assert msg != null; - - final UUID nodeId = node.id(); - - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Received message for finished future [msg=" + msg + ", fut=" + this + ']'); - - return; - } - - if (log.isDebugEnabled()) - log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); + assert msg.exchangeId() != null : msg; + assert !node.isDaemon() : node; initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> f) { @@ -2322,25 +2370,62 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param node Sender node. * @param msg Message. */ - public void processSinglePartitionRequest(final ClusterNode node, GridDhtPartitionsSingleRequest msg) { + public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) { + assert !cctx.kernalContext().clientNode(); + assert !node.isDaemon() : node; + if (!cctx.discovery().alive(node.id())) return; initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> fut) { - synchronized (this) { - if (finishState != null && node.id().equals(finishState.crdId)) + processSinglePartitionRequest(node, msg); + } + }); + } + + /** + * @param node Sender node. + * @param msg Message. + */ + private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) { + synchronized (this) { + switch (state) { + case DONE: { + assert finishState != null; + + if (node.id().equals(finishState.crdId)) { + log.info("Ignore partitions request, finished exchange with this coordinator: " + msg); + return; + } + + break; } - try { - sendLocalPartitions(node); + case CRD: + case BECOME_CRD: { + log.info("Ignore partitions request, node is coordinator: " + msg); + + return; } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send message to coordinator: " + e); + + case SRV: { + + break; } + } - }); + } + + // TODO 5578, backward compatibility, send state if available. + + try { + sendLocalPartitions(node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message to coordinator: " + e); + } } /** @@ -2353,21 +2438,49 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert msg.lastVersion() != null : msg; synchronized (this) { - if (crd == null || finishState != null) + if (crd == null) { + log.info("Ignore full message, all server nodes left: " + msg); + return; + } - if (!crd.equals(node)) { - if (log.isDebugEnabled()) - log.debug("Received full partition map from unexpected node [oldest=" + crd.id() + - ", nodeId=" + node.id() + ']'); + switch (state) { + case CRD: + case BECOME_CRD: { + log.info("Ignore full message, node is coordinator: " + msg); - if (node.order() > crd.order()) - fullMsgs.put(node, msg); + return; + } - return; - } + case DONE: { + log.info("Ignore full message, future is done: " + msg); + + return; + } + + case SRV: + case CLIENT: { + if (!crd.equals(node)) { + if (log.isDebugEnabled()) + log.debug("Received full partition map from unexpected node [oldest=" + crd.id() + + ", nodeId=" + node.id() + ']'); + + if (node.order() > crd.order()) + fullMsgs.put(node, msg); - finishState = new FinishState(crd.id(), msg.resultTopologyVersion()); + return; + } + else { + finishState = new FinishState(crd.id(), + msg.resultTopologyVersion() != null ? msg.resultTopologyVersion() : initialVersion(), + msg); + + state = ExchangeLocalState.DONE; + + break; + } + } + } } if (exchCtx.mergeExchanges()) { @@ -2593,18 +2706,42 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean rmvd = remaining.remove(node.id()); + if (!rmvd) { + if (mergedJoinExchMsgs != null && mergedJoinExchMsgs.containsKey(node.id())) { + if (mergedJoinExchMsgs.get(node.id()) == null) { + mergedJoinExchMsgs.remove(node.id()); + + rmvd = true; + } + } + } + if (node.equals(crd)) { crdChanged = true; crd = !srvNodes.isEmpty() ? srvNodes.get(0) : null; } - if (crd != null && crd.isLocal()) { - if (!crdChanged && crdReady && rmvd) - allReceived = remaining.isEmpty(); + switch (state) { + case CRD: + allReceived = rmvd && (remaining.isEmpty() && F.isEmpty(mergedJoinExchMsgs)); + + break; + + case SRV: + assert crd != null; + + if (crdChanged && crd.isLocal()) + state = ExchangeLocalState.BECOME_CRD; + + break; } crd0 = crd; + + if (crd0 == null) { + finishState = new FinishState(null, initialVersion(), null); + } } if (crd0 == null) { @@ -2657,10 +2794,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } else { if (crdChanged) { - sendPartitions(crd0); - for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) processFullMessage(m.getKey(), m.getValue()); + + if (!isDone()) + sendPartitions(crd0); } } } @@ -2689,9 +2827,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte synchronized (this) { assert crd != null && crd.isLocal(); - assert !crdReady; - crdReady = true; + state = ExchangeLocalState.CRD; if (!remaining.isEmpty()) remaining0 = new HashSet<>(remaining); @@ -2882,14 +3019,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private final UUID crdId; /** */ - private final AffinityTopologyVersion topVer; + private final AffinityTopologyVersion resTopVer; + + /** */ + private final GridDhtPartitionsFullMessage msg; /** * @param crdId Coordinator node. */ - FinishState(UUID crdId, AffinityTopologyVersion topVer) { + FinishState(UUID crdId, AffinityTopologyVersion resTopVer, GridDhtPartitionsFullMessage msg) { this.crdId = crdId; - this.topVer = topVer; + this.resTopVer = resTopVer; + this.msg = msg; } } + + /** + * + */ + enum ExchangeLocalState { + CRD, + SRV, + CLIENT, + BECOME_CRD, + DONE, + MERGED + } }
