Repository: ignite Updated Branches: refs/heads/master 8d3e37bf2 -> 66fcde3d0
IGNITE-9227 Fixed missing reply to a single message during coordinator failover. Fixes #4518. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66fcde3d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66fcde3d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66fcde3d Branch: refs/heads/master Commit: 66fcde3d0acb88b028024e4a62dc4b2f9ddfe534 Parents: 8d3e37b Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Thu Aug 16 18:39:49 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Thu Aug 16 18:41:12 2018 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 17 +- .../GridCachePartitionExchangeManager.java | 34 ++- .../preloader/CacheGroupAffinityMessage.java | 17 +- .../GridDhtPartitionsExchangeFuture.java | 215 +++++++++++-------- .../preloader/GridDhtPartitionsFullMessage.java | 4 +- .../cache/persistence/file/FilePageStore.java | 2 +- ...rtitionsExchangeCoordinatorFailoverTest.java | 155 +++++++++++++ .../distributed/CacheExchangeMergeTest.java | 2 - .../testsuites/IgniteCacheTestSuite6.java | 3 +- 9 files changed, 349 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 3862523..41f7c63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -70,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -1383,9 +1382,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); - final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity(); + final Map<Integer, CacheGroupAffinityMessage> receivedAff = msg.joinedNodeAffinity(); - assert F.isEmpty(affReq) || (!F.isEmpty(joinedNodeAff) && joinedNodeAff.size() >= affReq.size()) : msg; + assert F.isEmpty(affReq) || (!F.isEmpty(receivedAff) && receivedAff.size() >= affReq.size()) + : ("Requested and received affinity are different " + + "[requestedCnt=" + (affReq != null ? affReq.size() : "none") + + ", receivedCnt=" + (receivedAff != null ? receivedAff.size() : "none") + + ", msg=" + msg + "]"); forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { @@ -1398,7 +1401,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (affReq != null && affReq.contains(aff.groupId())) { assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()); - CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId()); + CacheGroupAffinityMessage affMsg = receivedAff.get(aff.groupId()); assert affMsg != null; @@ -1774,8 +1777,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. * @return Future completed when caches initialization is done. */ - public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut, - final boolean newAff) throws IgniteCheckedException { + public IgniteInternalFuture<?> initCoordinatorCaches( + final GridDhtPartitionsExchangeFuture fut, + final boolean newAff + ) throws IgniteCheckedException { final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); final AffinityTopologyVersion topVer = fut.initialVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f65d338..d5eeaeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -116,6 +116,7 @@ import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -330,6 +331,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class, new MessageHandler<GridDhtPartitionsSingleMessage>() { @Override public void onMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + GridDhtPartitionExchangeId exchangeId = msg.exchangeId(); + + if (exchangeId != null) { + GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchangeId); + + boolean fastReplied = fut.fastReplyOnSingleMessage(node, msg); + + if (fastReplied) { + if (log.isInfoEnabled()) + log.info("Fast replied to single message " + + "[exchId=" + exchangeId + ", nodeId=" + node.id() + "]"); + + return; + } + } + if (!crdInitFut.isDone() && !msg.restoreState()) { GridDhtPartitionExchangeId exchId = msg.exchangeId(); @@ -1363,6 +1380,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Gets exchange future by exchange id. + * + * @param exchId Exchange id. + */ + private GridDhtPartitionsExchangeFuture exchangeFuture(@NotNull GridDhtPartitionExchangeId exchId) { + return exchangeFuture(exchId, null, null, null, null); + } + + /** * @param exchId Exchange ID. * @param discoEvt Discovery event. * @param cache Discovery data cache. @@ -1370,11 +1396,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param affChangeMsg Affinity change message. * @return Exchange future. */ - private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, + private GridDhtPartitionsExchangeFuture exchangeFuture( + @NotNull GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, @Nullable DiscoCache cache, @Nullable ExchangeActions exchActions, - @Nullable CacheAffinityChangeMessage affChangeMsg) { + @Nullable CacheAffinityChangeMessage affChangeMsg + ) { GridDhtPartitionsExchangeFuture fut; GridDhtPartitionsExchangeFuture old = exchFuts.addx( @@ -1571,7 +1599,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana scheduleResendPartitions(); } else { - GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), null, null, null, null); + GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId()); if (log.isDebugEnabled()) log.debug("Notifying exchange future about single message: " + exchFut); http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java index 8a1ffb4..7da4051 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.typedef.F; @@ -144,7 +145,8 @@ public class CacheGroupAffinityMessage implements Message { GridCacheSharedContext cctx, AffinityTopologyVersion topVer, Collection<Integer> affReq, - @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff) { + @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff + ) { assert !F.isEmpty(affReq) : affReq; if (cachesAff == null) @@ -152,7 +154,18 @@ public class CacheGroupAffinityMessage implements Message { for (Integer grpId : affReq) { if (!cachesAff.containsKey(grpId)) { - GridAffinityAssignmentCache aff = cctx.affinity().affinity(grpId); + GridAffinityAssignmentCache aff = cctx.affinity().groupAffinity(grpId); + + // If no coordinator group holder on the node, try fetch affinity from existing cache group. + if (aff == null) { + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + + assert grp != null : "No cache group holder or cache group to create AffinityMessage" + + ". Requested group id: " + grpId + + ". Topology version: " + topVer; + + aff = grp.affinity(); + } List<List<ClusterNode>> assign = aff.readyAssignments(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 28dc8f3..fbd3264 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -24,8 +24,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -37,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.stream.Stream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -1608,74 +1611,90 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @param msg Message to send. + * @param fullMsg Message to send. * @param nodes Nodes. * @param mergedJoinExchMsgs Messages received from merged 'join node' exchanges. - * @param joinedNodeAff Affinity if was requested by some nodes. + * @param affinityForJoinedNodes Affinity if was requested by some nodes. */ private void sendAllPartitions( - GridDhtPartitionsFullMessage msg, + GridDhtPartitionsFullMessage fullMsg, Collection<ClusterNode> nodes, Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs, - Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) { - boolean singleNode = nodes.size() == 1; - - GridDhtPartitionsFullMessage joinedNodeMsg = null; - + Map<Integer, CacheGroupAffinityMessage> affinityForJoinedNodes + ) { assert !nodes.contains(cctx.localNode()); if (log.isDebugEnabled()) { log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + - ", exchId=" + exchId + ", msg=" + msg + ']'); + ", exchId=" + exchId + ", msg=" + fullMsg + ']'); } - for (ClusterNode node : nodes) { - GridDhtPartitionsFullMessage sndMsg = msg; - - if (joinedNodeAff != null) { - if (singleNode) - msg.joinedNodeAffinity(joinedNodeAff); - else { - GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id()); + // Find any single message with affinity request. This request exists only for newly joined nodes. + Optional<GridDhtPartitionsSingleMessage> singleMsgWithAffinityReq = nodes.stream() + .flatMap(node -> Optional.ofNullable(msgs.get(node.id())) + .filter(singleMsg -> singleMsg.cacheGroupsAffinityRequest() != null) + .map(Stream::of) + .orElse(Stream.empty())) + .findAny(); + + // Prepare full message for newly joined nodes with affinity request. + final GridDhtPartitionsFullMessage fullMsgWithAffinity = singleMsgWithAffinityReq + .filter(singleMessage -> affinityForJoinedNodes != null) + .map(singleMessage -> fullMsg.copy().joinedNodeAffinity(affinityForJoinedNodes)) + .orElse(null); + + // Prepare and send full messages for given nodes. + nodes.stream() + .map(node -> { + // No joined nodes, just send a regular full message. + if (fullMsgWithAffinity == null) + return new T2<>(node, fullMsg); + + return new T2<>( + node, + // If single message contains affinity request, use special full message for such single messages. + Optional.ofNullable(msgs.get(node.id())) + .filter(singleMsg -> singleMsg.cacheGroupsAffinityRequest() != null) + .map(singleMsg -> fullMsgWithAffinity) + .orElse(fullMsg) + ); + }) + .map(nodeAndMsg -> { + ClusterNode node = nodeAndMsg.get1(); + GridDhtPartitionsFullMessage fullMsgToSend = nodeAndMsg.get2(); + + // If exchange has merged, use merged version of exchange id. + GridDhtPartitionExchangeId sndExchId = mergedJoinExchMsgs != null + ? Optional.ofNullable(mergedJoinExchMsgs.get(node.id())) + .map(GridDhtPartitionsAbstractMessage::exchangeId) + .orElse(exchangeId()) + : exchangeId(); - if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) { - if (joinedNodeMsg == null) { - joinedNodeMsg = msg.copy(); + if (sndExchId != null && !sndExchId.equals(exchangeId())) { + GridDhtPartitionsFullMessage fullMsgWithUpdatedExchangeId = fullMsgToSend.copy(); - joinedNodeMsg.joinedNodeAffinity(joinedNodeAff); - } + fullMsgWithUpdatedExchangeId.exchangeId(sndExchId); - sndMsg = joinedNodeMsg; - } + return new T2<>(node, fullMsgWithUpdatedExchangeId); } - } - - try { - GridDhtPartitionExchangeId sndExchId = exchangeId(); - if (mergedJoinExchMsgs != null) { - GridDhtPartitionsSingleMessage mergedMsg = mergedJoinExchMsgs.get(node.id()); + return new T2<>(node, fullMsgToSend); + }) + .forEach(nodeAndMsg -> { + ClusterNode node = nodeAndMsg.get1(); + GridDhtPartitionsFullMessage fullMsgToSend = nodeAndMsg.get2(); - if (mergedMsg != null) - sndExchId = mergedMsg.exchangeId(); + try { + cctx.io().send(node, fullMsgToSend, SYSTEM_POOL); } - - if (sndExchId != null && !sndExchId.equals(exchangeId())) { - sndMsg = sndMsg.copy(); - - sndMsg.exchangeId(sndExchId); + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); } - - cctx.io().send(node, sndMsg, SYSTEM_POOL); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send partitions, node failed: " + node); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partitions [node=" + node + ']', e); - } - } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions [node=" + node + ']', e); + } + }); } /** @@ -2213,6 +2232,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Tries to fast reply with {@link GridDhtPartitionsFullMessage} on received single message + * in case of exchange future has already completed. + * + * @param node Cluster node which sent single message. + * @param msg Single message. + * @return {@code true} if fast reply succeed. + */ + public boolean fastReplyOnSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + GridDhtPartitionsExchangeFuture futToFastReply = this; + + ExchangeLocalState currState; + + synchronized (mux) { + currState = state; + + if (currState == ExchangeLocalState.MERGED) + futToFastReply = mergedWith; + } + + if (currState == ExchangeLocalState.DONE) + futToFastReply.processSingleMessage(node.id(), msg); + else if (currState == ExchangeLocalState.MERGED) + futToFastReply.processMergedMessage(node, msg); + + return currState == ExchangeLocalState.MERGED || currState == ExchangeLocalState.DONE; + } + + /** * @param nodeId Node ID. * @param msg Client's message. */ @@ -2931,9 +2978,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte synchronized (mux) { srvNodes.remove(cctx.localNode()); - nodes = U.newHashSet(srvNodes.size()); - - nodes.addAll(srvNodes); + nodes = new LinkedHashSet<>(srvNodes); mergedJoinExchMsgs0 = mergedJoinExchMsgs; @@ -3094,48 +3139,50 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage msg, UUID nodeId) { ClusterNode node = cctx.node(nodeId); - if (node != null) { - GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy(); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + nodeId); - Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + return; + } - if (affReq != null) { - Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages( - cctx, - finishState.resTopVer, - affReq, - null); + GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy(); - fullMsg.joinedNodeAffinity(aff); - } + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); - if (!fullMsg.exchangeId().equals(msg.exchangeId())) { - fullMsg = fullMsg.copy(); + if (affReq != null) { + Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages( + cctx, + finishState.resTopVer, + affReq, + null); - fullMsg.exchangeId(msg.exchangeId()); - } + fullMsg.joinedNodeAffinity(aff); + } - try { - cctx.io().send(node, fullMsg, SYSTEM_POOL); + if (!fullMsg.exchangeId().equals(msg.exchangeId())) { + fullMsg = fullMsg.copy(); - if (log.isDebugEnabled()) { - log.debug("Full message was sent to node: " + - node + - ", fullMsg: " + fullMsg - ); - } - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send partitions, node failed: " + node); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partitions [node=" + node + ']', e); - } + fullMsg.exchangeId(msg.exchangeId()); } - else if (log.isDebugEnabled()) - log.debug("Failed to send partitions, node failed: " + nodeId); + try { + cctx.io().send(node, fullMsg, SYSTEM_POOL); + + if (log.isDebugEnabled()) { + log.debug("Full message was sent to node: " + + node + + ", fullMsg: " + fullMsg + ); + } + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions [node=" + node + ']', e); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 5962468..ab45d8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -218,8 +218,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param joinedNodeAff Caches affinity for joining nodes. */ - void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) { + GridDhtPartitionsFullMessage joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) { this.joinedNodeAff = joinedNodeAff; + + return this; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index ca31f05..c4f90e5 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -165,7 +165,7 @@ public class FilePageStore implements PageStore { try { ByteBuffer hdr = header(type, dbCfg.getPageSize()); - fileIO.writeFully(hdr); + fileIO.writeFully(hdr); //there is 'super' page in every file return headerSize() + dbCfg.getPageSize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java new file mode 100644 index 0000000..a2adcf7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +/** + * Advanced coordinator failure scenarios during PME. + */ +public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName); + + cfg.setCacheConfiguration( + new CacheConfiguration("cache-" + igniteInstanceName) + .setBackups(1) + .setNodeFilter(nodeFilter) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + ); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60 * 1000L; + } + + /** + * Tests that new coordinator is able to finish old exchanges in case of in-complete coordinator initialization. + */ + public void testNewCoordinatorCompletedExchange() throws Exception { + IgniteEx crd = (IgniteEx) startGrid("crd"); + + IgniteEx newCrd = startGrid(1); + + crd.cluster().active(true); + + // 3 node join topology version. + AffinityTopologyVersion joinThirdNodeVer = new AffinityTopologyVersion(3, 0); + + // 4 node join topology version. + AffinityTopologyVersion joinFourNodeVer = new AffinityTopologyVersion(4, 0); + + // Block FullMessage for newly joined nodes. + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd); + + final CountDownLatch sendFullMsgLatch = new CountDownLatch(1); + + // Delay sending full message to newly joined nodes. + spi.blockMessages((node, msg) -> { + if (msg instanceof GridDhtPartitionsFullMessage && node.order() > 2) { + try { + sendFullMsgLatch.await(); + } + catch (Throwable ignored) { } + + return true; + } + + return false; + }); + + IgniteInternalFuture joinTwoNodesFut = GridTestUtils.runAsync(() -> startGridsMultiThreaded(2, 2)); + + GridCachePartitionExchangeManager exchangeMgr = newCrd.context().cache().context().exchange(); + + // Wait till new coordinator finishes third node join exchange. + GridTestUtils.waitForCondition( + () -> exchangeMgr.readyAffinityVersion().compareTo(joinThirdNodeVer) >= 0, + getTestTimeout() + ); + + IgniteInternalFuture startLastNodeFut = GridTestUtils.runAsync(() -> startGrid(5)); + + // Wait till new coordinator starts third node join exchange. + GridTestUtils.waitForCondition( + () -> exchangeMgr.lastTopologyFuture().initialVersion().compareTo(joinFourNodeVer) >= 0, + getTestTimeout() + ); + + IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid("crd", true, false)); + + // Magic sleep to make sure that coordinator stop process has started. + U.sleep(1000); + + // Resume full messages sending to unblock coordinator stopping process. + sendFullMsgLatch.countDown(); + + // Coordinator stop should succeed. + stopCrdFut.get(); + + // Nodes join should succeed. + joinTwoNodesFut.get(); + + startLastNodeFut.get(); + + awaitPartitionMapExchange(); + + // Check that all caches are operable. + for (Ignite grid : G.allGrids()) { + IgniteCache cache = grid.cache("cache-" + grid.cluster().localNode().consistentId()); + + Assert.assertNotNull(cache); + + cache.put(0, 0); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index b255066..1183634 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -634,8 +634,6 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9227"); - cfgCache = false; final Ignite srv0 = startGrids(2); http://git-wip-us.apache.org/repos/asf/ignite/blob/66fcde3d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index b52e339..f9e6b81 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest; +import org.apache.ignite.internal.processors.cache.PartitionsExchangeCoordinatorFailoverTest; import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest; import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest; @@ -109,10 +110,10 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(IgniteExchangeLatchManagerCoordinatorFailTest.class); + suite.addTestSuite(PartitionsExchangeCoordinatorFailoverTest.class); suite.addTestSuite(CacheTryLockMultithreadedTest.class); //suite.addTestSuite(CacheClientsConcurrentStartTest.class); - //suite.addTestSuite(CacheTryLockMultithreadedTest.class); //suite.addTestSuite(GridCacheRebalancingOrderingTest.class); //suite.addTestSuite(IgniteCacheClientMultiNodeUpdateTopologyLockTest.class);