Repository: ignite Updated Branches: refs/heads/ignite-5578 688e9b041 -> 5a663e67f
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a663e67 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a663e67 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a663e67 Branch: refs/heads/ignite-5578 Commit: 5a663e67f078f615abadb4187fe6c1a9b5b5e96a Parents: 688e9b0 Author: sboikov <[email protected]> Authored: Fri Jul 21 11:40:16 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Jul 21 18:27:09 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 28 +- .../GridCachePartitionExchangeManager.java | 15 +- .../GridDhtPartitionsAbstractMessage.java | 15 +- .../GridDhtPartitionsExchangeFuture.java | 421 +++++++++++++------ .../GridDhtPartitionsSingleMessage.java | 39 +- .../dht/preloader/InitNewCoordinatorFuture.java | 179 ++++++++ .../distributed/CacheExchangeMergeTest.java | 271 ++++++++++-- 7 files changed, 784 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 7f55e79..d5b4be7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1564,19 +1564,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * Called on exchange initiated by server node leave. * * @param fut Exchange future. + * @param crd Coordinator flag. * @throws IgniteCheckedException If failed. * @return {@code True} if affinity should be assigned by coordinator. */ - public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { ClusterNode leftNode = fut.discoveryEvent().eventNode(); assert !leftNode.isClient() : leftNode; - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; + if (crd) { + // Need initialize CacheGroupHolders if this node become coordinator on this exchange. + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc); - grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + cache.aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + } + }); + } + else { + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + } + }); } synchronized (mux) { @@ -1600,12 +1612,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { CacheGroupHolder grpHolder = grpHolders.get(desc.groupId()); - if (grpHolder != null) { - if (grpHolder.client()) // Affinity for non-client holders calculated in {@link #onServerLeft}. - grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); - + if (grpHolder != null) return; - } // Need initialize holders and affinity if this node became coordinator during this exchange. final Integer grpId = desc.groupId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 93b1729..d26ca0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -598,7 +598,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param ver Node version. * @return Supported exchange protocol version. */ - static int exchangeProtocolVersion(IgniteProductVersion ver) { + public static int exchangeProtocolVersion(IgniteProductVersion ver) { if (ver.compareToIgnoreTimestamp(EXCHANGE_PROTOCOL_2_SINCE) >= 0) return 2; @@ -1884,6 +1884,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** Busy flag used as performance optimization to stop current preloading. */ private volatile boolean busy; + /** */ + private boolean crd; + /** * Constructor. */ @@ -2112,7 +2115,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana lastInitializedFut = exchFut; - exchFut.init(); + boolean newCrd = false; + + if (!crd) { + List<ClusterNode> srvNodes = exchFut.discoCache().serverNodes(); + + crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal(); + } + + exchFut.init(newCrd); int dumpCnt = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 466ec03..9adbf0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -34,7 +34,10 @@ import org.jetbrains.annotations.Nullable; */ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { /** */ - protected static final byte COMPRESSED_FLAG_MASK = 1; + private static final byte COMPRESSED_FLAG_MASK = 0x01; + + /** */ + private static final byte RESTORE_STATE_FLAG_MASK = 0x02; /** */ private static final long serialVersionUID = 0L; @@ -46,7 +49,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage private GridCacheVersion lastVer; /** */ - private byte flags; + protected byte flags; /** * Required by {@link Externalizable}. @@ -131,6 +134,14 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK); } + public void restoreState(boolean restoreState) { + flags = restoreState ? (byte)(flags | RESTORE_STATE_FLAG_MASK) : (byte)(flags & ~RESTORE_STATE_FLAG_MASK); + } + + boolean restoreState() { + return (flags & RESTORE_STATE_FLAG_MASK) != 0; + } + /** {@inheritDoc} */ @Override public byte fieldsCount() { return 5; http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 68e9951..3674276 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -78,8 +78,8 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.GridPartitionStateMap; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -88,7 +88,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -175,7 +174,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * Messages received on non-coordinator are stored in case if this node * becomes coordinator. */ - private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>(); + private final Map<UUID, GridDhtPartitionsSingleMessage> pendingSingleMsgs = new ConcurrentHashMap8<>(); /** Messages received from new coordinator. */ private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>(); @@ -243,6 +242,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private FinishState finishState; + /** */ + @GridToStringExclude + private InitNewCoordinatorFuture newCrdFut; + /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -277,6 +280,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); } + GridCacheSharedContext sharedContext() { + return cctx; + } + /** {@inheritDoc} */ @Override public boolean skipForExchangeMerge() { return false; @@ -456,7 +463,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * * @throws IgniteInterruptedCheckedException If interrupted. */ - public void init() throws IgniteInterruptedCheckedException { + public void init(boolean newCrd) throws IgniteInterruptedCheckedException { if (isDone()) return; @@ -484,24 +491,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert state == null : state; - if (crdNode) { - if (!remaining.isEmpty()) { - IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false); - - if (fut != null) - fut.get(); - } - + if (crdNode) state = ExchangeLocalState.CRD; - } else state = cctx.kernalContext().clientNode() ? ExchangeLocalState.CLIENT : ExchangeLocalState.SRV; exchLog.info("Started exchange init [topVer=" + topVer + ", crd=" + crdNode + - ", evt=" + discoEvt.type() + - ", node=" + discoEvt.node() + - ", evtNode=" + discoEvt.node() + + ", evt=" + IgniteUtils.gridEventName(discoEvt.type()) + + ", evtNode=" + discoEvt.eventNode().id() + ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)discoEvt).customMessage() : null) + ']'); @@ -571,6 +569,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + if (newCrd) { + IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false); + + if (fut != null) + fut.get(); + } + updateTopologies(crdNode); if (exchange != null) { @@ -867,7 +872,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte warnNoAffinityNodes(); - centralizedAff = cctx.affinity().onServerLeft(this); + centralizedAff = cctx.affinity().onServerLeft(this, crd); } else cctx.affinity().onServerJoin(this, crd); @@ -1199,7 +1204,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @return {@code True} if exchange for local node join. */ - private boolean localJoinExchange() { + boolean localJoinExchange() { return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal(); } @@ -1277,13 +1282,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param nodes Nodes. * @param joinedNodeAff Affinity if was requested by some nodes. - * @throws IgniteCheckedException If failed. */ private void sendAllPartitions( GridDhtPartitionsFullMessage msg, Collection<ClusterNode> nodes, - Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) - throws IgniteCheckedException { + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) { boolean singleNode = nodes.size() == 1; GridDhtPartitionsFullMessage joinedNodeMsg = null; @@ -1334,13 +1337,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.io().send(node, sndMsg, SYSTEM_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); + } catch (IgniteCheckedException e) { - if (cctx.io().checkNodeLeft(node.id(), e, false)) { - if (log.isDebugEnabled()) - log.debug("Failed to send partitions, node failed: " + node); - } - else - U.error(log, "Failed to send partitions [node=" + node + ']', e); + U.error(log, "Failed to send partitions [node=" + node + ']', e); } } } @@ -1498,7 +1500,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * Cleans up resources to avoid excessive memory usage. */ public void cleanUp() { - singleMsgs.clear(); + pendingSingleMsgs.clear(); fullMsgs.clear(); msgs.clear(); changeGlobalStateExceptions.clear(); @@ -1650,6 +1652,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchId.equals(msg.exchangeId()) : msg; assert !cctx.kernalContext().clientNode(); + if (msg.restoreState()) { + InitNewCoordinatorFuture newCrdFut0; + + synchronized (this) { + assert newCrdFut != null; + + newCrdFut0 = newCrdFut; + } + + newCrdFut0.onMessage(node, msg); + + return; + } + + if (!msg.client()) { assert msg.lastVersion() != null : msg; @@ -1697,7 +1714,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param nodeId Node ID. - * @param msg + * @param msg Client's message. */ private void waitAndReplyToClient(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { assert msg.client(); @@ -1780,7 +1797,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.info("Non-coordinator received single message [ver=" + initialVersion() + ", node=" + nodeId + ", state=" + state + ']'); - singleMsgs.put(nodeId, msg); + pendingSingleMsgs.put(nodeId, msg); break; } @@ -2292,7 +2309,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte onDone(exchCtx.events().topologyVersion(), err); - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : singleMsgs.entrySet()) + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : pendingSingleMsgs.entrySet()) processSingleMessage(e.getKey(), e.getValue()); } } @@ -2379,7 +2396,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } - processFullMessage(node, msg); + processFullMessage(true, node, msg); } }); } @@ -2389,11 +2406,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param msg Message. */ public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) { - assert !cctx.kernalContext().clientNode(); - assert !node.isDaemon() : node; - - if (!cctx.discovery().alive(node.id())) - return; + assert !cctx.kernalContext().clientNode() || msg.restoreState(); + assert !node.isDaemon() && !CU.clientNode(node) : node; initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> fut) { @@ -2407,7 +2421,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param msg Message. */ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) { + FinishState finishState0 = null; + synchronized (this) { + if (crd == null) { + log.info("Ignore partitions request, no coordinator [node=" + node.id() + ']'); + + return; + } + switch (state) { case DONE: { assert finishState != null; @@ -2418,6 +2440,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } + finishState0 = finishState; + break; } @@ -2428,15 +2452,64 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } + case CLIENT: case SRV: { + if (!cctx.discovery().alive(node)) { + log.info("Ignore restore state request, node is not alive [node=" + node.id() + ']'); + + return; + } + + if (msg.restoreState()) { + if (!node.equals(crd)) { + if (node.order() > crd.order()) { + log.info("Received restore state request, change coordinator [oldCrd=" + crd.id() + + "newCrd=" + node.id() + ']'); + + crd = node; // Do not allow to process FullMessage from old coordinator. + } + else { + log.info("Ignore restore state request, coordinator changed [oldCrd=" + crd.id() + + "newCrd=" + node.id() + ']'); + + return; + } + } + } break; } + default: + assert false : state; } } - // TODO 5578, backward compatibility, send state if available. + if (msg.restoreState()) { + try { + GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage(msg.exchangeId(), + cctx.kernalContext().clientNode(), + true); + + if (localJoinExchange()) + res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); + + res.restoreState(true); + + res.finishMessage(finishState0 != null ? finishState0.msg : null); + + cctx.io().send(node, res, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e); + } + + return; + } try { sendLocalPartitions(node); @@ -2450,60 +2523,66 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param node Sender node. * @param msg Message. */ - private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { + private void processFullMessage(boolean checkCrd, ClusterNode node, GridDhtPartitionsFullMessage msg) { try { assert exchId.equals(msg.exchangeId()) : msg; assert msg.lastVersion() != null : msg; - synchronized (this) { - if (crd == null) { - log.info("Ignore full message, all server nodes left: " + msg); + if (checkCrd) { + assert node != null; - return; - } - - switch (state) { - case CRD: - case BECOME_CRD: { - log.info("Ignore full message, node is coordinator: " + msg); + synchronized (this) { + if (crd == null) { + log.info("Ignore full message, all server nodes left: " + msg); return; } - case DONE: { - log.info("Ignore full message, future is done: " + msg); + switch (state) { + case CRD: + case BECOME_CRD: { + log.info("Ignore full message, node is coordinator: " + msg); - return; - } + return; + } - case SRV: - case CLIENT: { - if (!crd.equals(node)) { - log.info("Received full message from non-coordinator [node=" + node.id() + + case DONE: { + log.info("Ignore full message, future is done: " + msg); + + return; + } + + case SRV: + case CLIENT: { + if (!crd.equals(node)) { + log.info("Received full message from non-coordinator [node=" + node.id() + ", nodeOrder=" + node.order() + ", crd=" + crd.id() + ", crdOrder=" + crd.order() + ']'); - if (node.order() > crd.order()) - fullMsgs.put(node, msg); + if (node.order() > crd.order()) + fullMsgs.put(node, msg); - return; - } - else { - log.info("Received full message, will finish exchange [node=" + node.id() + - ", resVer=" + msg.resultTopologyVersion() + ']'); + return; + } + else { + log.info("Received full message, will finish exchange [node=" + node.id() + + ", resVer=" + msg.resultTopologyVersion() + ']'); - finishState = new FinishState(crd.id(), - msg.resultTopologyVersion() != null ? msg.resultTopologyVersion() : initialVersion(), - msg); + finishState = new FinishState(crd.id(), + msg.resultTopologyVersion() != null ? msg.resultTopologyVersion() : initialVersion(), + msg); - state = ExchangeLocalState.DONE; + state = ExchangeLocalState.DONE; - break; + break; + } } } } } + else + assert node == null : node; if (exchCtx.mergeExchanges()) { if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) { @@ -2722,6 +2801,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte discoCache.updateAlives(node); + InitNewCoordinatorFuture newCrdFut0; + + synchronized (this) { + newCrdFut0 = newCrdFut; + } + + if (newCrdFut0 != null) + newCrdFut0.onNodeLeft(node.id()); + synchronized (this) { if (!srvNodes.remove(node)) return; @@ -2756,9 +2844,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte case SRV: assert crd != null; - if (crdChanged && crd.isLocal()) + if (crdChanged && crd.isLocal()) { state = ExchangeLocalState.BECOME_CRD; + newCrdFut = new InitNewCoordinatorFuture(); + } + break; } @@ -2770,7 +2861,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (crd0 == null) { - assert cctx.kernalContext().clientNode() || cctx.localNode().isDaemon() : cctx.localNode(); + assert cctx.kernalContext().clientNode() : cctx.localNode(); List<ClusterNode> empty = Collections.emptyList(); @@ -2796,20 +2887,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.info("Coordinator failed, node is new coordinator [ver=" + initialVersion() + ", prev=" + node.id() + ']'); - boolean newAff = localJoinExchange(); + assert newCrdFut != null; - IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches( - GridDhtPartitionsExchangeFuture.this, newAff); - if (fut == null || fut.isDone()) - onBecomeCoordinator(); - else { - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - onBecomeCoordinator(); - } - }); - } + newCrdFut.init(GridDhtPartitionsExchangeFuture.this); + + newCrdFut.listen(new CI1<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + onBecomeCoordinator((InitNewCoordinatorFuture)fut); + } + }); return; } @@ -2823,22 +2910,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else { if (crdChanged) { for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) { - log.info("Coordinator changed, process pending full message [" + - "ver=" + initialVersion() + - ", crd=" + node.id() + - ", pendingMsgNode=" + m.getKey() + ']'); + if (crd0.equals(m.getKey())) { + log.info("Coordinator changed, process pending full message [" + + "ver=" + initialVersion() + + ", crd=" + node.id() + + ", pendingMsgNode=" + m.getKey() + ']'); + + processFullMessage(true, m.getKey(), m.getValue()); - processFullMessage(m.getKey(), m.getValue()); + if (isDone()) + return; + } } - if (!isDone()) { - log.info("Coordinator changed, send partitions to new coordinator [" + - "ver=" + initialVersion() + - ", crd=" + node.id() + - ", newCrd=" + crd0.id() + ']'); + log.info("Coordinator changed, send partitions to new coordinator [" + + "ver=" + initialVersion() + + ", crd=" + node.id() + + ", newCrd=" + crd0.id() + ']'); - sendPartitions(crd0); - } + sendPartitions(crd0); } } } @@ -2860,56 +2950,131 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * + * @param newCrdFut Coordinator initialization future. */ - private void onBecomeCoordinator() { - Set<UUID> remaining0 = null; + private void onBecomeCoordinator(InitNewCoordinatorFuture newCrdFut) { + boolean allRcvd = false; - synchronized (this) { - assert crd != null && crd.isLocal(); - - state = ExchangeLocalState.CRD; + if (newCrdFut.restoreState()) { + GridDhtPartitionsFullMessage fullMsg = newCrdFut.fullMessage(); - assert mergedJoinExchMsgs == null; + boolean process = fullMsg == null; - log.info("New coordinator initialization finished [ver=" + initialVersion() + - ", remaining=" + remaining + ']'); + Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs0 = newCrdFut.messages(); - if (!remaining.isEmpty()) - remaining0 = new HashSet<>(remaining); - } + assert msgs.isEmpty() : msgs; - if (remaining0 != null) { - // It is possible that some nodes finished exchange with previous coordinator. - GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId); + for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : msgs0.entrySet()) { + GridDhtPartitionsSingleMessage msg = e.getValue(); - for (UUID nodeId : remaining0) { - try { - if (!singleMsgs.containsKey(nodeId)) { - log.info("New coordinator sends request [ver=" + initialVersion() + - ", node=" + nodeId + ']'); + if (!msg.client()) { + msgs.put(e.getKey().id(), e.getValue()); - cctx.io().send(nodeId, req, SYSTEM_POOL); - } + if (process) + updatePartitionSingleMap(e.getKey().id(), msg); } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Node left during partition exchange [nodeId=" + nodeId + - ", exchId=" + exchId + ']'); + } + + if (fullMsg != null) { + log.info("New coordinator restored state [ver=" + initialVersion() + + ", resVer=" + fullMsg.resultTopologyVersion() + ']'); + + synchronized (this) { + state = ExchangeLocalState.DONE; + + finishState = new FinishState(crd.id(), fullMsg.resultTopologyVersion(), fullMsg); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to request partitions from node: " + nodeId, e); + + processFullMessage(false, null, fullMsg); + + Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = newCrdFut.messages(); + + if (!F.isEmpty(msgs)) { + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; + + for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { + GridDhtPartitionsSingleMessage msg = e.getValue(); + + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + + if (!F.isEmpty(affReq)) { + joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, + fullMsg.resultTopologyVersion(), + affReq, + joinedNodeAff); + } + } + + sendAllPartitions(fullMsg, msgs.keySet(), joinedNodeAff); } + + return; } + else + log.info("New coordinator restore state finished [ver=" + initialVersion() + ']'); - for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet()) { - log.info("New coordinator process pending message [ver=" + initialVersion() + - ", node=" + m.getKey() + ']'); + allRcvd = true; - processSingleMessage(m.getKey(), m.getValue()); + synchronized (this) { + remaining.clear(); // Do not process messages. + + assert crd != null && crd.isLocal(); + + state = ExchangeLocalState.CRD; + + assert mergedJoinExchMsgs == null; } } else { + Set<UUID> remaining0 = null; + + synchronized (this) { + assert crd != null && crd.isLocal(); + + state = ExchangeLocalState.CRD; + + assert mergedJoinExchMsgs == null; + + log.info("New coordinator initialization finished [ver=" + initialVersion() + + ", remaining=" + remaining + ']'); + + if (!remaining.isEmpty()) + remaining0 = new HashSet<>(remaining); + } + + if (remaining0 != null) { + // It is possible that some nodes finished exchange with previous coordinator. + GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId); + + for (UUID nodeId : remaining0) { + try { + if (!pendingSingleMsgs.containsKey(nodeId)) { + log.info("New coordinator sends request [ver=" + initialVersion() + + ", node=" + nodeId + ']'); + + cctx.io().send(nodeId, req, SYSTEM_POOL); + } + } + catch (ClusterTopologyCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug("Node left during partition exchange [nodeId=" + nodeId + + ", exchId=" + exchId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to request partitions from node: " + nodeId, e); + } + } + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : pendingSingleMsgs.entrySet()) { + log.info("New coordinator process pending message [ver=" + initialVersion() + + ", node=" + m.getKey() + ']'); + + processSingleMessage(m.getKey(), m.getValue()); + } + } + } + + if (allRcvd) { awaitSingleMapUpdates(); onAllReceived(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 4c98742..4df6d67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -94,6 +94,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @GridDirectCollection(Integer.class) private Collection<Integer> grpsAffRequest; + /** */ + private GridDhtPartitionsFullMessage finishMsg; + /** * Required by {@link Externalizable}. */ @@ -117,6 +120,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes this.compress = compress; } + public void finishMessage(GridDhtPartitionsFullMessage finishMsg) { + this.finishMsg = finishMsg; + } + + public GridDhtPartitionsFullMessage finishMessage() { + return finishMsg; + } + /** * @param grpsAffRequest */ @@ -394,24 +405,30 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 8: - if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT)) + if (!writer.writeMessage("finishMsg", finishMsg)) return false; writer.incrementState(); case 9: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 10: - if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 11: + if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + return false; + + writer.incrementState(); + + case 12: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -458,7 +475,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 8: - grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT); + finishMsg = reader.readMessage("finishMsg"); if (!reader.isLastRead()) return false; @@ -466,7 +483,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 9: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -474,7 +491,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 10: - partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -482,6 +499,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 11: + partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -501,7 +526,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 12; + return 13; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java new file mode 100644 index 0000000..4f9eee3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; + +/** + * + */ +public class InitNewCoordinatorFuture extends GridCompoundFuture { + /** */ + private GridDhtPartitionsFullMessage fullMsg; + + /** */ + private Set<UUID> awaited = new HashSet<>(); + + /** */ + private Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = new HashMap<>(); + + /** */ + private GridFutureAdapter restoreStateFut; + + /** */ + private IgniteLogger log; + + /** */ + // TODO IGNITE-5578 backward compatibility + private boolean restoreState = true; + + public boolean restoreState() { + return restoreState; + } + + /** + * @param exchFut Current future. + * @throws IgniteCheckedException If failed. + */ + public void init(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { + GridCacheSharedContext cctx = exchFut.sharedContext(); + + log = cctx.logger(getClass()); + + boolean newAff = exchFut.localJoinExchange(); + + IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(exchFut, newAff); + + if (fut != null) + add(fut); + + DiscoCache discoCache = exchFut.discoCache(); + + List<ClusterNode> nodes = new ArrayList<>(); + + synchronized (this) { + for (ClusterNode node : discoCache.allNodes()) { + if (!node.isLocal() && cctx.discovery().alive(node)) { + awaited.add(node.id()); + + nodes.add(node); + } + } + + if (!awaited.isEmpty()) { + restoreStateFut = new GridFutureAdapter(); + + add(restoreStateFut); + } + } + + if (!nodes.isEmpty()) { + // TODO IGNITE-5578: merged nodes. + GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchFut.exchangeId()); + + req.restoreState(true); + + for (ClusterNode node : nodes) { + try { + cctx.io().send(node, req, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions request, node failed: " + node); + + onNodeLeft(node.id()); + } + } + } + + markInitialized(); + } + + /** + * @return Received messages. + */ + Map<ClusterNode, GridDhtPartitionsSingleMessage> messages() { + return msgs; + } + + /** + * @return Full message is some of nodes received it from previous coordinator. + */ + GridDhtPartitionsFullMessage fullMessage() { + return fullMsg; + } + + /** + * @param node Node. + * @param msg Message. + */ + public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { + assert msg.restoreState() : msg; + + boolean done = false; + + synchronized (this) { + if (awaited.remove(node.id())) { + GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage(); + + if (fullMsg0 != null) { + assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion()); + + fullMsg = fullMsg0; + } + + msgs.put(node, msg); + + done = awaited.isEmpty(); + } + } + + if (done) + restoreStateFut.onDone(); + } + + /** + * @param nodeId Failed node ID. + */ + public void onNodeLeft(UUID nodeId) { + boolean done; + + synchronized (this) { + done = awaited.remove(nodeId) && awaited.isEmpty(); + } + + if (done) + restoreStateFut.onDone(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5a663e67/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 3e20be5..3d10b31 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; @@ -37,6 +39,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -119,6 +122,58 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { return ccfg; } + // TODO IGNITE-5578 joined merged node failed (client/server). + // TODO IGNITE-5578 random topology changes, random delay for exchange messages. + // TODO IGNITE-5578 check exchanges/affinity consistency. + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartServers() throws Exception { + concurrentStart(false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartServersAndClients() throws Exception { + concurrentStart(true); + } + + /** + * @param withClients If {@code true} also starts client nodes. + * @throws Exception If failed. + */ + private void concurrentStart(final boolean withClients) throws Exception { + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + startGrid(0); + + final AtomicInteger idx = new AtomicInteger(1); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + if (withClients) + client.set(ThreadLocalRandom.current().nextBoolean()); + + Ignite node = startGrid(idx.incrementAndGet()); + + checkNodeCaches(node); + + return null; + } + }, 10, "start-node"); + + fut.get(); + + checkCaches(); + + // TODO: stop by one, check caches - in all tests. + stopAllGrids(); + } + } + /** * @throws Exception If failed. */ @@ -242,15 +297,89 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testNoMergeJoinExchangeCoordinatorChange1() throws Exception { - mergeJoinExchangeCoordinatorChange(4, CoordinatorChangeMode.NEW_CRD_RCDV); + public void testJoinExchangeCoordinatorChange_NoMerge_1() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(4, true, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinExchangeCoordinatorChange_NoMerge_2() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(8, true, mode); + + stopAllGrids(); + } } /** * @throws Exception If failed. */ - private void mergeJoinExchangeCoordinatorChange(int srvs, CoordinatorChangeMode mode) throws Exception { - log.info("mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", mode=" + mode + ']'); + public void testFailExchangeCoordinatorChange_NoMerge_1() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(5, false, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFailExchangeCoordinatorChange_NoMerge_2() throws Exception { + for (CoordinatorChangeMode mode : CoordinatorChangeMode.values()) { + exchangeCoordinatorChangeNoMerge(8, false, mode); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeExchangeCoordinatorChange() throws Exception { + testSpi = true; + + final int srvs = 4; + + Ignite srv0 = startGrids(srvs); + + mergeExchangeWaitVersion(srv0, 6); + + final AtomicInteger idx = new AtomicInteger(srvs); + + CountDownLatch latch = blockExchangeFinish(srv0, 5, F.asList(2, 3, 4, 5), F.asList(1)); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(idx.getAndIncrement()); + + return null; + } + }, 2, "start-node"); + + if (latch != null && !latch.await(5, TimeUnit.SECONDS)) + fail("Failed to wait for expected messages."); + + stopGrid(0); + + fut.get(); + + checkCaches(); + } + + /** + * @param srvs Number of servers. + * @param join If {@code true} starts new node, otherwise stops node. + * @param mode Tested scenario. + * @throws Exception If failed. + */ + private void exchangeCoordinatorChangeNoMerge(int srvs, final boolean join, CoordinatorChangeMode mode) throws Exception { + log.info("Test mergeJoinExchangeCoordinatorChange [nodes=" + srvs + ", mode=" + mode + ']'); testSpi = true; @@ -262,7 +391,10 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { - startGrid(nodes); + if (join) + startGrid(nodes); + else + stopGrid(nodes - 1); return null; } @@ -280,13 +412,19 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { checkCaches(); } + /** + * @param srvs Number of server nodes. + * @param mode Test scenario. + * @return Awaited state latch. + * @throws Exception If failed. + */ private CountDownLatch blockExchangeFinish(int srvs, CoordinatorChangeMode mode) throws Exception { Ignite crd = ignite(0); long topVer = srvs + 1; switch (mode) { - case NON_RCVD: { + case NOBODY_RCVD: { blockExchangeFinish(crd, topVer); break; @@ -314,9 +452,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } /** - * @param srvs - * @param waitNodes - * @return + * @param srvs Number of servers. + * @param waitNodes Nodes which should receive message. + * @return Blocked nodes indexes. */ private List<Integer> blockNodes(int srvs, List<Integer> waitNodes) { List<Integer> block = new ArrayList<>(); @@ -330,20 +468,6 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } /** - * - */ - enum CoordinatorChangeMode { - /** */ - NON_RCVD, - - /** */ - NEW_CRD_RCDV, - - /** */ - NON_CRD_RCVD - } - - /** * @param crd Exchange coordinator. * @param topVer Exchange topology version. */ @@ -366,6 +490,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @param crd Exchange coordinator. * @param topVer Exchange topology version. + * @param blockNodes Nodes which do not receive messages. + * @param waitMsgNodes Nodes which should receive messages. + * @return Awaited state latch. */ private CountDownLatch blockExchangeFinish(Ignite crd, long topVer, @@ -385,7 +512,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { if (msg instanceof GridDhtPartitionsFullMessage) { GridDhtPartitionsFullMessage msg0 = (GridDhtPartitionsFullMessage)msg; - if (msg0.exchangeId() == null || !msg0.exchangeId().topologyVersion().equals(topVer0)) + if (msg0.exchangeId() == null || msg0.exchangeId().topologyVersion().compareTo(topVer0) < 0) return false; String name = node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME); @@ -426,8 +553,12 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void checkCaches() throws Exception { + checkAffinity(); + checkCaches0(); + checkAffinity(); + awaitPartitionMapExchange(); checkCaches0(); @@ -441,25 +572,74 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { assertTrue(nodes.size() > 0); + for (Ignite node : nodes) + checkNodeCaches(node); + } + + /** + * @throws Exception If failed. + */ + private void checkAffinity() throws Exception { + List<Ignite> nodes = G.allGrids(); + + ClusterNode crdNode = null; + + for (Ignite node : nodes) { + ClusterNode locNode = node.cluster().localNode(); + + if (crdNode == null || locNode.order() < crdNode.order()) + crdNode = locNode; + } + + AffinityTopologyVersion topVer = ((IgniteKernal)grid(crdNode)). + context().cache().context().exchange().readyAffinityVersion(); + + Map<String, List<List<ClusterNode>>> affMap = new HashMap<>(); + + for (Ignite node : nodes) { + IgniteKernal node0 = (IgniteKernal)node; + + for (IgniteInternalCache cache : node0.context().cache().caches()) { + List<List<ClusterNode>> aff = affMap.get(cache.name()); + List<List<ClusterNode>> aff0 = cache.context().affinity().assignments(topVer); + + if (aff != null) + assertEquals(aff, aff0); + else + affMap.put(cache.name(), aff0); + } + } + } + + /** + * @param node Node. + */ + private void checkNodeCaches(Ignite node) { String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}; ThreadLocalRandom rnd = ThreadLocalRandom.current(); - for (Ignite node : nodes) { - for (String cacheName : cacheNames) { - IgniteCache<Object, Object> cache = node.cache(cacheName); + for (String cacheName : cacheNames) { + IgniteCache<Object, Object> cache = node.cache(cacheName); - assertNotNull("No cache [node=" + node.name() + - ", order=" + node.cluster().localNode().order() + - ", cache=" + cacheName + ']', cache); + // TODO: multithreaded, putAll and transactions. - for (int i = 0; i < 10; i++) { - Integer key = rnd.nextInt(100_000); + assertNotNull("No cache [node=" + node.name() + + ", client=" + node.configuration().isClientMode() + + ", order=" + node.cluster().localNode().order() + + ", cache=" + cacheName + ']', cache); - cache.put(key, i); + String err = "Invalid value [node=" + node.name() + + ", client=" + node.configuration().isClientMode() + + ", order=" + node.cluster().localNode().order() + + ", cache=" + cacheName + ']'; - assertEquals(i, cache.get(key)); - } + for (int i = 0; i < 10; i++) { + Integer key = rnd.nextInt(100_000); + + cache.put(key, i); + + assertEquals(err, i, cache.get(key)); } } } @@ -480,4 +660,25 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { assertTrue(wait); } + + /** + * + */ + enum CoordinatorChangeMode { + /** + * Coordinator failed, did not send full message. + */ + NOBODY_RCVD, + + /** + * Coordinator failed, but new coordinator received full message + * and finished exchange. + */ + NEW_CRD_RCDV, + + /** + * Coordinator failed, but one of servers (not new coordinator) received full message. + */ + NON_CRD_RCVD + } }
