Merge remote-tracking branch 'remotes/origin/master' into ignite-5578 # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b3f44078 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b3f44078 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b3f44078 Branch: refs/heads/ignite-5578 Commit: b3f44078eac07425a828b9fff8a184e5dc503412 Parents: 18f4929 e9a0d69 Author: sboikov <sboi...@gridgain.com> Authored: Thu Jul 27 15:27:43 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jul 27 15:27:43 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 5 +- .../processors/cache/GridCacheIoManager.java | 2 +- .../GridCachePartitionExchangeManager.java | 44 ++++++++++----- .../dht/GridClientPartitionTopology.java | 12 +++- .../dht/GridDhtPartitionTopology.java | 9 ++- .../dht/GridDhtPartitionTopologyImpl.java | 33 ++++++----- .../preloader/GridDhtPartitionExchangeId.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 36 ++++++++---- .../preloader/GridDhtPartitionsFullMessage.java | 4 +- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../near/GridNearOptimisticTxPrepareFuture.java | 2 +- .../GridNearPessimisticTxPrepareFuture.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 59 +++++++++++--------- ...cingDelayedPartitionMapExchangeSelfTest.java | 58 +++++++++++++++---- .../junits/common/GridCommonAbstractTest.java | 6 +- 15 files changed, 185 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4fdad7c,6a7258f..c0e6a11 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -130,10 -129,7 +130,10 @@@ public class GridCachePartitionExchange private static final int EXCHANGE_HISTORY_SIZE = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000); + /** TODO IGNITE-5578. */ + private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.0"); + - /** Atomic reference for pending timeout object. */ + /** Atomic reference for pending partition resend timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); /** Partition resend timeout after eviction. */ @@@ -1102,11 -1072,12 +1110,11 @@@ } /** - * @param node Node. - * @param id ID. + * @param node Destination cluster node. + * @param id Exchange ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, - id, + GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), false); @@@ -1127,7 -1098,8 +1135,7 @@@ } /** - * @param exchangeId ID. - * @param targetNode Target node. + * @param exchangeId Exchange ID. * @param clientOnlyExchange Client exchange flag. * @param sndCounters {@code True} if need send partition update counters. * @return Message. http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0dae4d2,a8e13a0..1b4bbcc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -435,18 -433,14 +435,17 @@@ public class GridDhtPartitionTopologyIm if (stopping) return; - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + ExchangeDiscoveryEvents evts = exchFut.context().events(); - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + - ", exchId=" + exchId + ']'; + assert topVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + topVer + + ", exchId=" + exchFut.exchangeId() + ']'; - if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent()) - removeNode(exchId.nodeId()); + topVer = evts.topologyVersion(); + for (DiscoveryEvent evt : evts.events()) { + if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !CU.clientNode(evt.eventNode())) + removeNode(evt.eventNode().id()); + } - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (log.isDebugEnabled()) { @@@ -1118,21 -1113,12 +1117,21 @@@ if (stopping) return false; + if (exchangeVer == null && (topReadyFut == null || !topReadyFut.isDone())) + return false; + + if (exchangeVer != null) { + assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer; + + topVer = exchangeVer; + } + - if (cntrMap != null) { + if (incomeCntrMap != null) { // update local map partition counters - for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { - T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); + for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) { + T2<Long, Long> existCntr = this.cntrMap.get(e.getKey()); - if (cntr == null || cntr.get2() < e.getValue().get2()) + if (existCntr == null || existCntr.get2() < e.getValue().get2()) this.cntrMap.put(e.getKey(), e.getValue()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5888f77,71e41b0..f749833 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -723,10 -613,11 +726,11 @@@ public class GridDhtPartitionsExchangeF boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) { - top.update(topologyVersion(), + top.update(initialVersion(), clientTop.partitionMap(true), clientTop.updateCounters(false), - Collections.<Integer>emptySet()); + Collections.<Integer>emptySet(), + null); } } @@@ -1223,14 -1096,7 +1227,14 @@@ } /** + * @return {@code True} if exchange for local node join. + */ + boolean localJoinExchange() { + return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal(); + } + + /** - * @param node Node. + * @param node Target Node. * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { @@@ -1692,291 -1507,39 +1696,297 @@@ } /** - * @param top Topology to assign. ++ * Processing of received single message. Actual processing in future may be delayed if init method was not ++ * completed, see {@link #initDone()} ++ * + * @param node Sender node. + * @param msg Single partition info. */ - private void assignPartitionStates(GridDhtPartitionTopology top) { - Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>(); - Map<Integer, Long> minCntrs = new HashMap<>(); + public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + assert !node.isDaemon() : node; + assert msg != null; + assert exchId.equals(msg.exchangeId()) : msg; + assert !cctx.kernalContext().clientNode(); - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { - assert e.getValue().partitionUpdateCounters(top.groupId()) != null; + if (msg.restoreState()) { + InitNewCoordinatorFuture newCrdFut0; - for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) { - int p = e0.getKey(); + synchronized (this) { + assert newCrdFut != null; - UUID uuid = e.getKey(); + newCrdFut0 = newCrdFut; + } - GridDhtPartitionState state = top.partitionState(uuid, p); + newCrdFut0.onMessage(node, msg); - if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING) - continue; + return; + } - Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2(); + if (!msg.client()) { + assert msg.lastVersion() != null : msg; - if (cntr == null) - cntr = 0L; + updateLastVersion(msg.lastVersion()); + } - Long minCntr = minCntrs.get(p); + GridDhtPartitionsExchangeFuture mergedWith0 = null; - if (minCntr == null || minCntr > cntr) - minCntrs.put(p, cntr); + synchronized (this) { + if (state == ExchangeLocalState.MERGED) { + assert mergedWith != null; - if (state != GridDhtPartitionState.OWNING) - continue; + mergedWith0 = mergedWith; + } + else { + assert state != ExchangeLocalState.CLIENT; - CounterWithNodes maxCntr = maxCntrs.get(p); + if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) + pendingJoinMsg = msg; + } + } + + if (mergedWith0 != null) { + mergedWith0.processMergedMessage(node, msg); + + return; + } + + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + + processSingleMessage(node.id(), msg); + } + }); + } + + public void waitAndReplyToNode(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + FinishState finishState0; + + synchronized (GridDhtPartitionsExchangeFuture.this) { + finishState0 = finishState; + } + + assert finishState0 != null; + + sendAllPartitionsToNode(finishState0, msg, node.id()); + } + }); + } + + /** ++ * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the ++ * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section. ++ * + * @param nodeId Node ID. + * @param msg Client's message. + */ + private void waitAndReplyToClient(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { + assert msg.client(); + + listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + FinishState finishState0; + + synchronized (GridDhtPartitionsExchangeFuture.this) { + finishState0 = finishState; + } + + if (finishState0 == null) { + assert discoEvt.type() == EVT_NODE_JOINED && CU.clientNode(discoEvt.eventNode()) : discoEvt; + + finishState0 = new FinishState(cctx.localNodeId(), + initialVersion(), + createPartitionsMessage(false)); + } + + sendAllPartitionsToNode(finishState0, msg, nodeId); + } + }); + } + + /** + * @param nodeId Sender node. - * @param msg Message. ++ * @param msg Partition single message. + */ + void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + if (msg.client()) { + waitAndReplyToClient(nodeId, msg); + + return; + } + - boolean allReceived = false; ++ boolean allReceived = false; // Received all expected messages. + boolean updateSingleMap = false; + + FinishState finishState0 = null; + + synchronized (this) { + assert crd != null; + + switch (state) { + case DONE: { + log.info("Received single message, already done [ver=" + initialVersion() + + ", node=" + nodeId + ']'); + + assert finishState != null; + + finishState0 = finishState; + + break; + } + + case CRD: { + assert crd.isLocal() : crd; + + if (remaining.remove(nodeId)) { + updateSingleMap = true; + + pendingSingleUpdates++; + + if (stateChangeExchange() && msg.getError() != null) + changeGlobalStateExceptions.put(nodeId, msg.getError()); + + allReceived = remaining.isEmpty(); + + log.info("Coordinator received single message [ver=" + initialVersion() + + ", node=" + nodeId + + ", allReceived=" + allReceived + ']'); + } + + break; + } + + case SRV: + case BECOME_CRD: { + log.info("Non-coordinator received single message [ver=" + initialVersion() + + ", node=" + nodeId + ", state=" + state + ']'); + + pendingSingleMsgs.put(nodeId, msg); + + break; + } + + default: + assert false : state; + } + } + + if (finishState0 != null) { + sendAllPartitionsToNode(finishState0, msg, nodeId); + + return; + } + + if (updateSingleMap) { + try { + // Do not update partition map, in case cluster transitioning to inactive state. + if (!deactivateCluster()) + updatePartitionSingleMap(nodeId, msg); + } + finally { + synchronized (this) { + assert pendingSingleUpdates > 0; + + pendingSingleUpdates--; + + if (pendingSingleUpdates == 0) + notifyAll(); + } + } + } + + if (allReceived) { + if (!awaitSingleMapUpdates()) + return; + + onAllReceived(); + } + } + + /** + * @return {@code False} if interrupted. + */ + private synchronized boolean awaitSingleMapUpdates() { + try { + while (pendingSingleUpdates > 0) + U.wait(this); + + return true; + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); + + return false; + } + } + + /** + * @param fut Affinity future. + */ + private void onAffinityInitialized(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) { + try { + assert fut.isDone(); + + Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get(); + + GridDhtPartitionsFullMessage m = createPartitionsMessage(false); + + CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); + + if (log.isDebugEnabled()) + log.debug("Centralized affinity exchange, send affinity change message: " + msg); + + cctx.discovery().sendCustomEvent(msg); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * @param top Topology to assign. + */ + private void assignPartitionStates(GridDhtPartitionTopology top) { + Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>(); + Map<Integer, Long> minCntrs = new HashMap<>(); + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { + assert e.getValue().partitionUpdateCounters(top.groupId()) != null; + + for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) { + int p = e0.getKey(); + + UUID uuid = e.getKey(); + + GridDhtPartitionState state = top.partitionState(uuid, p); + + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING) + continue; + + Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2(); + + if (cntr == null) + cntr = 0L; + + Long minCntr = minCntrs.get(p); + + if (minCntr == null || minCntr > cntr) + minCntrs.put(p, cntr); + + if (state != GridDhtPartitionState.OWNING) + continue; + + CounterWithNodes maxCntr = maxCntrs.get(p); if (maxCntr == null || cntr > maxCntr.cnt) maxCntrs.put(p, new CounterWithNodes(cntr, uuid)); @@@ -2363,235 -1786,41 +2373,235 @@@ err = new IgniteCheckedException("Cluster state change failed."); - cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req); + cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req); + } + + boolean active = !stateChangeErr && req.activate(); + + ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage( + req.requestId(), + active); + + cctx.discovery().sendCustomEvent(stateFinishMsg); + } + + if (!nodes.isEmpty()) + sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff); + + onDone(exchCtx.events().topologyVersion(), err); + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : pendingSingleMsgs.entrySet()) + processSingleMessage(e.getKey(), e.getValue()); + } + } + catch (IgniteCheckedException e) { + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + onDone(e); + } + } + + /** + * + */ + private void assignPartitionsStates() { + if (cctx.database().persistenceEnabled()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; + + assignPartitionStates(grp.topology()); + } + } + } + + /** + * @param msg Request. + * @param nodeId Node ID. + */ + private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage msg, UUID nodeId) { + ClusterNode node = cctx.node(nodeId); + + if (node != null) { + GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy(); + fullMsg.exchangeId(msg.exchangeId()); + + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + + if (affReq != null) { + Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages( + cctx, + finishState.resTopVer, + affReq, + null); + + fullMsg.joinedNodeAffinity(aff); + } + + if (!fullMsg.exchangeId().equals(msg.exchangeId())) + fullMsg.exchangeId(msg.exchangeId()); + + try { + cctx.io().send(node, fullMsg, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions [node=" + node + ']', e); + } + } + else if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + nodeId); + + } + + /** + * @param node Sender node. + * @param msg Full partition info. + */ + public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitionsFullMessage msg) { + assert msg != null; + assert msg.exchangeId() != null : msg; + assert !node.isDaemon() : node; + + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + + processFullMessage(true, node, msg); + } + }); + } + + /** + * @param node Sender node. - * @param msg Message. ++ * @param msg Message with full partition info. + */ + public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) { + assert !cctx.kernalContext().clientNode() || msg.restoreState(); + assert !node.isDaemon() && !CU.clientNode(node) : node; + + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + processSinglePartitionRequest(node, msg); + } + }); + } + + /** + * @param node Sender node. + * @param msg Message. + */ + private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) { + FinishState finishState0 = null; + + synchronized (this) { + if (crd == null) { + log.info("Ignore partitions request, no coordinator [node=" + node.id() + ']'); + + return; + } + + switch (state) { + case DONE: { + assert finishState != null; + + if (node.id().equals(finishState.crdId)) { + log.info("Ignore partitions request, finished exchange with this coordinator: " + msg); + + return; + } + + finishState0 = finishState; + + break; + } + + case CRD: + case BECOME_CRD: { + log.info("Ignore partitions request, node is coordinator: " + msg); + + return; + } + + case CLIENT: + case SRV: { + if (!cctx.discovery().alive(node)) { + log.info("Ignore restore state request, node is not alive [node=" + node.id() + ']'); + + return; } - boolean active = !stateChangeErr && req.activate(); + if (msg.restoreState()) { + if (!node.equals(crd)) { + if (node.order() > crd.order()) { + log.info("Received restore state request, change coordinator [oldCrd=" + crd.id() + + "newCrd=" + node.id() + ']'); - ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(), active); + crd = node; // Do not allow to process FullMessage from old coordinator. + } + else { + log.info("Ignore restore state request, coordinator changed [oldCrd=" + crd.id() + + "newCrd=" + node.id() + ']'); - cctx.discovery().sendCustomEvent(msg); - } + return; + } + } + } - if (!nodes.isEmpty()) - sendAllPartitions(nodes); + break; + } - onDone(exchangeId().topologyVersion(), err); + default: + assert false : state; } } - catch (IgniteCheckedException e) { - if (reconnectOnError(e)) - onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); - else - onDone(e); - } - } - /** - * - */ - private void assignPartitionsStates() { - if (cctx.database().persistenceEnabled()) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; + if (msg.restoreState()) { + try { + assert msg.restoreExchangeId() != null : msg; - assignPartitionStates(grp.topology()); + GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage( + msg.restoreExchangeId(), + cctx.kernalContext().clientNode(), + true); + + if (localJoinExchange() && finishState0 == null) + res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); + + res.restoreState(true); + + res.finishMessage(finishState0 != null ? finishState0.msg : null); + + cctx.io().send(node, res, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e); + } + + return; + } + + try { + sendLocalPartitions(node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message to coordinator: " + e); } } @@@ -2726,19 -1960,21 +2736,21 @@@ CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null) { - grp.topology().update(topologyVersion(), + grp.topology().update(exchCtx.events().topologyVersion(), entry.getValue(), cntrMap, - msg.partsToReload(cctx.localNodeId(), grpId)); + msg.partsToReload(cctx.localNodeId(), grpId), + null); } else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) { - cctx.exchange().clientTopology(grpId, this).update(topologyVersion(), + cctx.exchange().clientTopology(grpId, this).update(exchCtx.events().topologyVersion(), entry.getValue(), cntrMap, - Collections.<Integer>emptySet()); + Collections.<Integer>emptySet(), + null); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b3f44078/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ----------------------------------------------------------------------