5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b22719e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b22719e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b22719e Branch: refs/heads/ignite-5578 Commit: 7b22719efa96541e2a85449560bff9896712be00 Parents: 8f6be3b Author: sboikov <[email protected]> Authored: Tue Aug 1 12:29:06 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Aug 1 13:25:37 2017 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 2 + .../cluster/GridClusterStateProcessor.java | 2 +- .../CacheLateAffinityAssignmentTest.java | 326 +++++++++++++++++-- ...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +- 4 files changed, 302 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1601f2b..0dcfffc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -214,9 +214,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>(); /** */ + @GridToStringExclude private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs; /** */ + @GridToStringExclude private int awaitMergedMsgs; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 6e94669..009f166 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -498,7 +498,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() + ", topVer=" + topVer + ", client=" + ctx.clientNode() + - ", daemon" + ctx.isDaemon() + "]"); + ", daemon=" + ctx.isDaemon() + "]"); } IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate)); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 4bb7554..16cb625 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; @@ -70,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -587,20 +589,41 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { * * @throws Exception If failed. */ - public void testAffinitySimpleNodeLeave() throws Exception { - startServer(0, 1); + public void testAffinitySimpleNodeLeave1() throws Exception { + affinitySimpleNodeLeave(2); + } - startServer(1, 2); + /** + * Simple test, node leaves. + * + * @throws Exception If failed. + */ + public void testAffinitySimpleNodeLeave2() throws Exception { + affinitySimpleNodeLeave(4); + } - checkAffinity(2, topVer(2, 0), false); + /** + * @param cnt Count of server nodes. + * @throws Exception If failed. + */ + private void affinitySimpleNodeLeave(int cnt) throws Exception { + int topVer = 1; - checkAffinity(2, topVer(2, 1), true); + startServer(topVer - 1, topVer++); - stopNode(1, 3); + for (int i = 0; i < cnt - 1; i++, topVer++) { + startServer(topVer - 1, topVer); - checkAffinity(1, topVer(3, 0), true); + checkAffinity(topVer, topVer(topVer, 0), false); - checkNoExchange(1, topVer(3, 1)); + checkAffinity(topVer, topVer(topVer, 1), true); + } + + stopNode(1, topVer); + + checkAffinity(cnt - 1, topVer(topVer, 0), true); + + checkNoExchange(cnt - 1, topVer(topVer, 1)); awaitPartitionMapExchange(); } @@ -1037,6 +1060,222 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testBlockedFinishMsg1() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 2); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg2() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg3() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(4, 3, false, 1); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg4() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg5() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 1); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg6() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg7() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(5, 3, false, 2, 4); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg8() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(6, 3, false, 2, 4); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsg9() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, false, 4); + } + + /** + * + * @throws Exception If failed. + */ + public void testBlockedFinishMsgForClient() throws Exception { + doTestCoordLeaveBlockedFinishExchangeMessage(5, 1, true, 4); + } + + /** + * Coordinator leaves without sending all {@link GridDhtPartitionsFullMessage} messages, + * exchange must be completed. + * + * @param cnt Number of nodes. + * @param stopId Node to stop. + * @param lastClient {@code True} if last started node is client. + * @param blockedIds Nodes not receiving exchange finish message. + * @throws Exception If failed. + */ + private void doTestCoordLeaveBlockedFinishExchangeMessage(int cnt, + int stopId, + boolean lastClient, + int... blockedIds) throws Exception + { + int ord = 1; + + for (int i = 0; i < cnt; i++) { + if (i == cnt - 1 && lastClient) + startClient(ord - 1, ord++); + else + startServer(ord - 1, ord++); + } + + awaitPartitionMapExchange(); + + TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(grid(0)); + + final Set<String> blocked = new HashSet<>(); + + for (int id : blockedIds) { + String name = grid(id).name(); + + blocked.add(name); + } + + spi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return blocked.contains(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME)) + && (msg instanceof GridDhtPartitionsFullMessage) + && (((GridDhtPartitionsFullMessage)msg).exchangeId() != null); + } + }); + + checkAffinity(cnt, topVer(ord - 1, 1), true); + + stopNode(stopId, ord); + + AffinityTopologyVersion topVer = topVer(ord, 0); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(cnt); + + List<Ignite> grids = G.allGrids(); + + for (Ignite ignite : grids) + futs.add(affinityReadyFuture(topVer, ignite)); + + assertEquals(futs.size(), grids.size()); + + for (int i = 0; i < futs.size(); i++) { + final IgniteInternalFuture<?> fut = futs.get(i); + + Ignite ignite = grids.get(i); + + if (!blocked.contains(ignite.name())) { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return fut.isDone(); + } + }, 5000); + + assertTrue(ignite.name(), fut.isDone()); + } + else + assertFalse(ignite.name(), fut.isDone()); + } + + ord++; + + stopNode(0, ord); // Triggers exchange completion from new coordinator. + + checkAffinity(cnt - 2, topVer(ord - 1, 0), true, false); + + checkAffinity(cnt - 2, topVer(ord, 0), true); + + awaitPartitionMapExchange(); + } + + /** + * Assignment is delayed, coordinator leaves, nodes must complete exchange with same assignments. + * + * @throws Exception If failed. + */ + public void testCoordinatorLeaveAfterNodeLeavesDelayAssignment() throws Exception { + Ignite ignite0 = startServer(0, 1); + + startServer(1, 2); + + Ignite ignite2 = startServer(2, 3); + + Ignite ignite3 = startServer(3, 4); + + TestRecordingCommunicationSpi spi0 = + (TestRecordingCommunicationSpi) ignite0.configuration().getCommunicationSpi(), spi2, spi3; + + // Prevent exchange completion. + spi0.blockMessages(GridDhtPartitionsFullMessage.class, ignite2.name()); + + // Block rebalance. + blockSupplySend(spi0, CACHE_NAME1); + blockSupplySend((spi2 = TestRecordingCommunicationSpi.spi(ignite2)), CACHE_NAME1); + blockSupplySend((spi3 = TestRecordingCommunicationSpi.spi(ignite3)), CACHE_NAME1); + + stopNode(1, 5); + + AffinityTopologyVersion topVer = topVer(5, 0); + + IgniteInternalFuture<?> fut0 = affinityReadyFuture(topVer, ignite0); + IgniteInternalFuture<?> fut2 = affinityReadyFuture(topVer, ignite2); + IgniteInternalFuture<?> fut3 = affinityReadyFuture(topVer, ignite3); + + U.sleep(1_000); + + assertTrue(fut0.isDone()); + assertFalse(fut2.isDone()); + assertTrue(fut3.isDone()); + + // Finish rebalance on ignite3. + spi2.stopBlock(true); + + stopNode(0, 6); + + spi3.stopBlock(true); + + checkAffinity(2, topVer, false); + } + + /** * Coordinator leaves during node leave exchange. * * @throws Exception If failed. @@ -2181,6 +2420,18 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } /** + * @param topVer Topology version. + * @param node Node. + * @return Exchange future. + */ + private IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion topVer, Ignite node) { + IgniteInternalFuture<?> fut = ((IgniteKernal)node).context().cache().context().exchange(). + affinityReadyFuture(topVer); + + return fut != null ? fut : new GridFinishedFuture<>(); + } + + /** * @param major Major version. * @param minor Minor version. * @return Topology version. @@ -2296,10 +2547,25 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { * @throws Exception If failed. * @return Affinity assignments. */ - @SuppressWarnings("unchecked") private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes, AffinityTopologyVersion topVer, boolean expIdeal) throws Exception { + return checkAffinity(expNodes, topVer, expIdeal, true); + } + + /** + * @param expNodes Expected nodes number. + * @param topVer Topology version. + * @param expIdeal If {@code true} expect ideal affinity assignment. + * @param checkPublicApi {@code True} to check {@link Affinity} API. + * @throws Exception If failed. + * @return Affinity assignments. + */ + @SuppressWarnings("unchecked") + private Map<String, List<List<ClusterNode>>> checkAffinity(int expNodes, + AffinityTopologyVersion topVer, + boolean expIdeal, + boolean checkPublicApi) throws Exception { List<Ignite> nodes = G.allGrids(); Map<String, List<List<ClusterNode>>> aff = new HashMap<>(); @@ -2331,35 +2597,37 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { assertAffinity(ideal, aff2, node, cctx.name(), topVer); - Affinity<Object> cacheAff = node.affinity(cctx.name()); + if (checkPublicApi) { + Affinity<Object> cacheAff = node.affinity(cctx.name()); - for (int i = 0; i < 10; i++) { - int part = cacheAff.partition(i); + for (int i = 0; i < 10; i++) { + int part = cacheAff.partition(i); - List<ClusterNode> partNodes = ideal.get(part); + List<ClusterNode> partNodes = ideal.get(part); - if (partNodes.isEmpty()) { - try { - cacheAff.mapKeyToNode(i); + if (partNodes.isEmpty()) { + try { + cacheAff.mapKeyToNode(i); - fail(); + fail(); + } + catch (IgniteException ignore) { + // No-op. + } } - catch (IgniteException ignore) { - // No-op. + else { + ClusterNode primary = cacheAff.mapKeyToNode(i); + + assertEquals(primary, partNodes.get(0)); } } - else { - ClusterNode primary = cacheAff.mapKeyToNode(i); - assertEquals(primary, partNodes.get(0)); - } - } + for (int p = 0; p < ideal.size(); p++) { + List<ClusterNode> exp = ideal.get(p); + Collection<ClusterNode> partNodes = cacheAff.mapPartitionToPrimaryAndBackups(p); - for (int p = 0; p < ideal.size(); p++) { - List<ClusterNode> exp = ideal.get(p); - Collection<ClusterNode> partNodes = cacheAff.mapPartitionToPrimaryAndBackups(p); - - assertEqualsCollections(exp, partNodes); + assertEqualsCollections(exp, partNodes); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7b22719e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java index cf898c5..0d0cda4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java @@ -362,11 +362,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ); - log.info("Put key1: " + key1); + log.info("Put key1 [key1=" + key1 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key1)) + ']'); cache0.put(key1, key1); - log.info("Put key2: " + key2); + log.info("Put key2 [key2=" + key2 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key2)) + ']'); cache0.put(key2, key2);
