IGNITE-8179 Resolver do not kill all server nodes, ZookeeperDiscoverySpiTest#testCommunicationFailureResolve_KillRandom always fails on TC - Fixes #4177.
Signed-off-by: Dmitriy Pavlov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6f731f5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6f731f5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6f731f5 Branch: refs/heads/ignite-8446 Commit: f6f731f575290b10d6d6bcb6869bb0a1b470455e Parents: 251c91b Author: Vitaliy Biryukov <[email protected]> Authored: Wed Aug 1 18:26:27 2018 +0300 Committer: Dmitriy Pavlov <[email protected]> Committed: Wed Aug 1 18:26:27 2018 +0300 ---------------------------------------------------------------------- .../zk/internal/ZookeeperDiscoverySpiTest.java | 112 +++++++++++++++---- 1 file changed, 90 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f6f731f5/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index 59577c8..24b61d7 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -30,11 +30,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -99,6 +99,7 @@ import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.gridfunc.PredicateMapView; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -179,7 +180,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { private static ThreadLocal<Boolean> clientThreadLoc = new ThreadLocal<>(); /** */ - private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>(); + private static ConcurrentHashMap<UUID, Map<T2<Integer, Long>, DiscoveryEvent>> evts = new ConcurrentHashMap<>(); /** */ private static volatile boolean err; @@ -232,6 +233,9 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** */ private String zkRootPath; + /** The number of clusters started in one test (increments when the first node in the cluster starts). */ + private final AtomicInteger clusterNum = new AtomicInteger(0); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(final String igniteInstanceName) throws Exception { if (testSockNio) @@ -304,6 +308,25 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); + if (cfg.isClientMode()) { + UUID currNodeId = cfg.getNodeId(); + + lsnrs.put(new IgnitePredicate<Event>() { + /** Last remembered uuid before node reconnected. */ + private UUID nodeId = currNodeId; + + @Override public boolean apply(Event evt) { + if(evt.type() == EVT_CLIENT_NODE_RECONNECTED){ + evts.remove(nodeId); + + nodeId = evt.node().id(); + } + + return false; + } + }, new int[] {EVT_CLIENT_NODE_RECONNECTED}); + } + lsnrs.put(new IgnitePredicate<Event>() { /** */ @IgniteInstanceResource @@ -316,10 +339,10 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { UUID locId = ((IgniteKernal)ignite).context().localNodeId(); - Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId); + Map<T2<Integer, Long>, DiscoveryEvent> nodeEvts = evts.get(locId); if (nodeEvts == null) { - Object old = evts.put(locId, nodeEvts = new TreeMap<>()); + Object old = evts.put(locId, nodeEvts = new LinkedHashMap<>()); assertNull(old); @@ -329,13 +352,18 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { synchronized (nodeEvts) { DiscoveryLocalJoinData locJoin = ((IgniteEx)ignite).context().discovery().localJoin(); - nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); + if (locJoin.event().node().order() == 1) + clusterNum.incrementAndGet(); + + nodeEvts.put(new T2<>(clusterNum.get(), locJoin.event().topologyVersion()), + locJoin.event()); } } } synchronized (nodeEvts) { - DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); + DiscoveryEvent old = nodeEvts.put(new T2<>(clusterNum.get(), discoveryEvt.topologyVersion()), + discoveryEvt); assertNull(old); } @@ -2972,8 +3000,6 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCommunicationFailureResolve_KillRandom() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-8179"); - sesTimeout = 2000; testCommSpi = true; @@ -2985,6 +3011,10 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { startGridsMultiThreaded(10, 5); + int nodesCnt = 15; + + waitForTopology(nodesCnt); + int nodeIdx = 15; for (int i = 0; i < 10; i++) { @@ -3007,11 +3037,15 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { // No-op. } - clientMode(ThreadLocalRandom.current().nextBoolean()); + boolean clientMode = ThreadLocalRandom.current().nextBoolean(); + + clientMode(clientMode); startGrid(nodeIdx++); - awaitPartitionMapExchange(); + nodesCnt = nodesCnt - KillRandomCommunicationFailureResolver.LAST_KILLED_NODES.size() + 1; + + waitForTopology(nodesCnt); } } @@ -4573,13 +4607,13 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private void checkEventsConsistency() { - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { + for (Map.Entry<UUID, Map<T2<Integer, Long>, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { UUID nodeId = nodeEvtEntry.getKey(); - Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); + Map<T2<Integer, Long>, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { + for (Map.Entry<UUID, Map<T2<Integer, Long>, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { if (!nodeId.equals(nodeEvtEntry0.getKey())) { - Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); + Map<T2<Integer, Long>, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); synchronized (nodeEvts) { synchronized (nodeEvts0) { @@ -4595,20 +4629,39 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { * @param evts1 Received events. * @param evts2 Received events. */ - private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) { - for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) { + private void checkEventsConsistency(Map<T2<Integer, Long>, DiscoveryEvent> evts1, Map<T2<Integer, Long>, DiscoveryEvent> evts2) { + for (Map.Entry<T2<Integer, Long>, DiscoveryEvent> e1 : evts1.entrySet()) { DiscoveryEvent evt1 = e1.getValue(); DiscoveryEvent evt2 = evts2.get(e1.getKey()); if (evt2 != null) { assertEquals(evt1.topologyVersion(), evt2.topologyVersion()); - assertEquals(evt1.eventNode(), evt2.eventNode()); - assertEquals(evt1.topologyNodes(), evt2.topologyNodes()); + assertEquals(evt1.eventNode().consistentId(), evt2.eventNode().consistentId()); + assertTrue(equalsTopologies(evt1.topologyNodes(), evt2.topologyNodes())); } } } /** + * @param nodes1 Nodes. + * @param nodes2 Nodes to be compared with {@code nodes1} for equality. + * + * @return True if nodes equal by consistent id. + */ + private boolean equalsTopologies(Collection<ClusterNode> nodes1, Collection<ClusterNode> nodes2) { + if(nodes1.size() != nodes2.size()) + return false; + + Set<Object> consistentIds1 = nodes1.stream() + .map(ClusterNode::consistentId) + .collect(Collectors.toSet()); + + return nodes2.stream() + .map(ClusterNode::consistentId) + .allMatch(consistentIds1::contains); + } + + /** * @param node Node. * @return Node's discovery SPI. */ @@ -4680,7 +4733,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public boolean apply() { - Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); + Map<T2<Integer, Long>, DiscoveryEvent> nodeEvts = evts.get(nodeId); if (nodeEvts == null) { info("No events for node: " + nodeId); @@ -4690,7 +4743,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { synchronized (nodeEvts) { for (DiscoveryEvent expEvt : expEvts) { - DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion()); + DiscoveryEvent evt0 = nodeEvts.get(new T2<>(clusterNum.get(), expEvt.topologyVersion())); if (evt0 == null) { info("No event for version: " + expEvt.topologyVersion()); @@ -5197,12 +5250,17 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { } }; + /** Last killed nodes. */ + static final Set<ClusterNode> LAST_KILLED_NODES = new HashSet<>(); + /** */ @LoggerResource private IgniteLogger log; /** {@inheritDoc} */ @Override public void resolve(CommunicationFailureContext ctx) { + LAST_KILLED_NODES.clear(); + List<ClusterNode> nodes = ctx.topologySnapshot(); ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -5211,16 +5269,26 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { log.info("Resolver kills nodes [total=" + nodes.size() + ", kill=" + killNodes + ']'); + long srvCnt = nodes.stream().filter(node -> !node.isClient()).count(); + Set<Integer> idxs = new HashSet<>(); - while (idxs.size() < killNodes) - idxs.add(rnd.nextInt(nodes.size())); + while (idxs.size() < killNodes) { + int idx = rnd.nextInt(nodes.size()); + + if(!nodes.get(idx).isClient() && !idxs.contains(idx) && --srvCnt < 1) + continue; + + idxs.add(idx); + } for (int idx : idxs) { ClusterNode node = nodes.get(idx); log.info("Resolver kills node: " + node.id()); + LAST_KILLED_NODES.add(node); + ctx.killNode(node); } }
