Repository: ignite Updated Branches: refs/heads/ignite-zk b361a7802 -> 804c84171
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/804c8417 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/804c8417 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/804c8417 Branch: refs/heads/ignite-zk Commit: 804c84171cc75e53bed549d13d5af6858786d9a7 Parents: b361a78 Author: sboikov <[email protected]> Authored: Mon Nov 13 15:25:25 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 13 15:25:25 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 64 +++++----- .../zk/ZookeeperDiscoverySpiBasicTest.java | 116 +++++++++++++++++++ 2 files changed, 149 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/804c8417/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 85b0aa5..4059b0b 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -849,53 +849,55 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery assert lastEvt == null || lastEvt.topVer + 1 == e.topVer : "lastEvt=" + lastEvt + ", nextEvt=" + e; if (!crd) { - if (locJoin) { - for (ZookeeperClusterNode node : e.allNodes) { - assert node.order() > 0 : node; + synchronized (curTop) { + if (locJoin) { + for (ZookeeperClusterNode node : e.allNodes) { + assert node.order() > 0 : node; - Object old = curTop.put(node.order(), node); + Object old = curTop.put(node.order(), node); - assert old == null : node; - } + assert old == null : node; + } - DiscoveryDataBag dataBag = new DiscoveryDataBag(e.node.id()); + DiscoveryDataBag dataBag = new DiscoveryDataBag(e.node.id()); - dataBag.joiningNodeData(e.joiningNodeData); - dataBag.commonData(e.commonData); + dataBag.joiningNodeData(e.joiningNodeData); + dataBag.commonData(e.commonData); - exchange.onExchange(dataBag); - } - else { - switch (e.evtType) { - case EventType.EVT_NODE_JOINED: { - ZookeeperClusterNode node = e.node; + exchange.onExchange(dataBag); + } + else { + switch (e.evtType) { + case EventType.EVT_NODE_JOINED: { + ZookeeperClusterNode node = e.node; - DiscoveryDataBag dataBag = new DiscoveryDataBag(e.node.id()); + DiscoveryDataBag dataBag = new DiscoveryDataBag(e.node.id()); - dataBag.joiningNodeData(e.joiningNodeData); - dataBag.commonData(e.commonData); + dataBag.joiningNodeData(e.joiningNodeData); + dataBag.commonData(e.commonData); - exchange.onExchange(dataBag); + exchange.onExchange(dataBag); - Object old = curTop.put(node.order(), node); + Object old = curTop.put(node.order(), node); - assert old == null : node; + assert old == null : node; - break; - } + break; + } - case EventType.EVT_NODE_FAILED: { - ZookeeperClusterNode node = e.node; + case EventType.EVT_NODE_FAILED: { + ZookeeperClusterNode node = e.node; - Object failedNode = curTop.remove(node.order()); + Object failedNode = curTop.remove(node.order()); - assert failedNode != null : node; + assert failedNode != null : node; - break; - } + break; + } - default: - assert false : e; + default: + assert false : e; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/804c8417/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 5e1a200..f32a7e6 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -17,21 +17,36 @@ package org.apache.ignite.spi.discovery.zk; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + /** * */ @@ -45,6 +60,12 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private boolean client; + /** */ + private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>(); + + /** */ + private static volatile boolean err; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -73,6 +94,47 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { cfg.setClientMode(client); + Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); + + lsnrs.put(new IgnitePredicate<Event>() { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + @Override public boolean apply(Event evt) { + try { + DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; + + UUID locId = ignite.cluster().localNode().id(); + + Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId); + + if (nodeEvts == null) { + Object old = evts.put(locId, nodeEvts = new TreeMap<>()); + + assertNull(old); + + DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); + + nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); + } + + DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); + + assertNull(old); + } + catch (Throwable e) { + err = true; + + info("Unexpected error: " + e); + } + + return true; + } + }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT}); + + cfg.setLocalEventListeners(lsnrs); + return cfg; } @@ -80,6 +142,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @Override protected void beforeTest() throws Exception { super.beforeTest(); + err = false; + + evts.clear(); + if (USE_TEST_CLUSTER) { zkCluster = new TestingCluster(1); zkCluster.start(); @@ -96,6 +162,54 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } super.afterTest(); + + assertFalse("Unexpected error, see log for details", err); + + checkEventsConsistency(); + + evts.clear(); + } + + /** + * @throws Exception If failed. + */ + private void checkEventsConsistency() throws Exception { + for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { + UUID nodeId = nodeEvtEntry.getKey(); + Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); + + for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { + if (!nodeId.equals(nodeEvtEntry0.getKey())) { + Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); + + checkEventsConsistency(nodeEvts, nodeEvts0); + } + } + } + } + + /** + * @param evts1 Received events. + * @param evts2 Received events. + */ + private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) { + for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) { + DiscoveryEvent evt1 = e1.getValue(); + DiscoveryEvent evt2 = evts2.get(e1.getKey()); + + if (evt2 != null) { + assertEquals(evt1.topologyVersion(), evt2.topologyVersion()); + assertEquals(evt1.eventNode(), evt2.eventNode()); + assertEquals(evt1.topologyNodes(), evt2.topologyNodes()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore1() throws Exception { + } /** @@ -224,6 +338,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { }, THREADS, "stop-node"); waitForTopology(SRVS); + + checkEventsConsistency(); } }
