zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ed2564a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ed2564a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ed2564a Branch: refs/heads/ignite-zk Commit: 6ed2564a8d68e651cb776e13302d62f415938bea Parents: 9970b95 Author: sboikov <[email protected]> Authored: Mon Nov 13 14:24:54 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 13 14:24:54 2017 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 18 ++++---- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 47 ++++++++++++++------ .../zk/ZookeeperDiscoverySpiBasicTest.java | 36 ++++++++++++++- 3 files changed, 77 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 49425ce..04683ac 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -478,19 +478,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (rmtNode == null) { DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi(); - assert discoverySpi instanceof TcpDiscoverySpi; - - TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; + boolean unknownNode = true; - ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); + if (discoverySpi instanceof TcpDiscoverySpi) { + TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; - boolean unknownNode = true; + ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); - if (node0 != null) { - assert node0.isClient() : node0; + if (node0 != null) { + assert node0.isClient() : node0; - if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) - unknownNode = false; + if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) + unknownNode = false; + } } if (unknownNode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index f36a6e2..85b0aa5 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; @@ -36,6 +37,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -415,9 +417,16 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery List<OpResult> res = zk.multi(joinOps); - log.info("Waiting for local join event."); + log.info("Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); + + for(;;) { + if (!joinLatch.await(10, TimeUnit.SECONDS)) { + U.warn(log, "Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); + } + else + break; + } - joinLatch.await(); } catch (Exception e) { throw new IgniteSpiException(e); @@ -572,13 +581,15 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (joinHist.put(data.order, data) == null) { try { + log.info("New joined node data: " + data); + byte[] bytes = zk.getData(path + "/" + child, null, null); assert bytes.length > 0; ZKJoiningNodeData joinData = unmarshal(bytes); - assert joinData.node != null; + assert joinData.node != null && joinData.joiningNodeData != null : joinData; joinData.node.order(data.order); @@ -697,6 +708,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery joinData.node, new ArrayList<>(curTop.values())); + log.info("ZK event [type=JOIN, node=" + joinData.node.id() + ", ver=" + v + ']'); + if (!joinData.node.id().equals(locNode.nodeId)) { DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(joinData.node.id()); @@ -720,6 +733,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery assert failedNode != null : data.order; + log.info("ZK event [type=FAIL, node=" + failedNode.id() + ", ver=" + v + ']'); + evts.put(v, new ZKDiscoveryEvent(EventType.EVT_NODE_FAILED, v, failedNode, @@ -735,6 +750,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery assert failedNode != null : oldData.order; + log.info("ZK event [type=FAIL, node=" + failedNode.id() + ", ver=" + v + ']'); + evts.put(v, new ZKDiscoveryEvent(EventType.EVT_NODE_FAILED, v, failedNode, @@ -747,7 +764,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } } - log.info("Generated discovery events on coordinator: " + evts); + log.info("Generated discovery events on coordinator [vers=" + evts.keySet() + ", evts=" + evts + ']'); ZKDiscoveryEvents newEvents; @@ -761,9 +778,13 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery else { TreeMap<Integer, ZKDiscoveryEvent> evts0 = new TreeMap<>(curCrdEvts.evts); - evts0.putAll(evts); + for (ZKDiscoveryEvent e : evts.values()) { + assert !evts0.containsKey(e.topVer) : "[newEvt=" + e + ", oldEvt=" + evts0.get(e.topVer) + ']'; - newEvents = new ZKDiscoveryEvents(newNodes, evts); + evts0.put(e.topVer, e); + } + + newEvents = new ZKDiscoveryEvents(newNodes, evts0); expVer = curCrdEvts.ver; } @@ -774,9 +795,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery zk.setData(EVENTS_PATH, marshal(newEvents), expVer); } catch (Exception e) { - log.info("Events update error: " + e); - - e.printStackTrace(System.out); + U.error(log, "Events update error: " + e, e); } curCrdEvts = newEvents; @@ -953,19 +972,19 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery static class ZKDiscoveryEvent implements Serializable { /** */ @GridToStringInclude - final int evtType; + final int topVer; /** */ @GridToStringInclude - final ZookeeperClusterNode node; + final int evtType; /** */ @GridToStringInclude - final List<ZookeeperClusterNode> allNodes; + final ZookeeperClusterNode node; /** */ - @GridToStringInclude - final int topVer; + @GridToStringExclude + final List<ZookeeperClusterNode> allNodes; /** */ Map<Integer, Serializable> joiningNodeData; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 6e6c528..5e1a200 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -42,6 +42,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private static final boolean USE_TEST_CLUSTER = true; + /** */ + private boolean client; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -68,6 +71,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { cfg.setMarshaller(new JdkMarshaller()); + cfg.setClientMode(client); + return cfg; } @@ -155,7 +160,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { for (Ignite node : G.allGrids()) node.compute().broadcast(new DummyCallable(null)); - awaitPartitionMapExchange(); + //awaitPartitionMapExchange(); } /** @@ -194,6 +199,35 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testStartStopWithClients() throws Exception { + final int SRVS = 3; + + startGrids(SRVS); + + client = true; + + final int THREADS = 30; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + startGridsMultiThreaded(SRVS, THREADS); + + waitForTopology(SRVS + THREADS); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + stopGrid(idx + 3); + } + }, THREADS, "stop-node"); + + waitForTopology(SRVS); + } + } + + /** * @param expSize Expected nodes number. * @throws Exception If failed. */
