Repository: ignite Updated Branches: refs/heads/ignite-zk 45e7e4060 -> a4be5afd0
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4be5afd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4be5afd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4be5afd Branch: refs/heads/ignite-zk Commit: a4be5afd03fef3b3fa8ce8c017d24636859c82f3 Parents: 45e7e40 Author: sboikov <[email protected]> Authored: Mon Nov 20 13:27:23 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 20 15:30:19 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 10 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 98 ++++-- .../zk/ZookeeperDiscoverySpiBasicTest.java | 335 +++++++++++-------- .../zookeeper/ZkTestClientCnxnSocketNIO.java | 12 +- 4 files changed, 281 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 92c4f54..75f4f36 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.discovery.JoiningNodesAware; @@ -258,15 +259,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** - * For testing only. - * - * @throws Exception If failed. - */ - public void waitConnectStart() throws Exception { - //connectStart.await(); - } - - /** * @return Local node instance. */ private ZookeeperClusterNode initLocalNode() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 4378c6f..9689762 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import org.apache.curator.utils.PathUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -97,6 +98,12 @@ public class ZookeeperDiscoveryImpl { /** */ private long gridStartTime; + /** */ + private long lastProcEvt = -1; + + /** */ + private boolean joined; + /** * @param log * @param basePath @@ -284,15 +291,29 @@ public class ZookeeperDiscoveryImpl { zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - nodeConnected(rc, children); + onConnected(rc, children); } }); + + connStartLatch.countDown(); } catch (ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); } } + /** TODO ZK */ + private final CountDownLatch connStartLatch = new CountDownLatch(1); + + /** + * For testing only. + * + * @throws Exception If failed. + */ + public void waitConnectStart() throws Exception { + connStartLatch.await(); + } + /** */ private ZkDiscoveryEventsData evts; @@ -303,10 +324,15 @@ public class ZookeeperDiscoveryImpl { * @param rc Async callback result. * @param aliveNodes Alive nodes. */ - private void nodeConnected(int rc, List<String> aliveNodes) { + private void onConnected(int rc, List<String> aliveNodes) { + assert !joined; + + checkIsCoordinator(rc, aliveNodes); + } + + private void checkIsCoordinator(int rc, List<String> aliveNodes) { try { assert rc == 0 : rc; - assert !joined; TreeMap<Integer, String> alives = new TreeMap<>(); @@ -339,13 +365,18 @@ public class ZookeeperDiscoveryImpl { assert prevE != null; + final int crdInternalId = crdE.getKey(); final int locInternalId0 = locInternalId; + log.info("Discovery coordinator already exists, watch for previous node [" + + "locId=" + locNode.id() + + ", prevPath=" + prevE.getValue() + ']'); + zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), new Watcher() { @Override public void process(WatchedEvent evt) { if (evt.getType() == Event.EventType.NodeDeleted) { try { - onPreviousCoordinatorFail(locInternalId0); + onPreviousNodeFail(crdInternalId, locInternalId0); } catch (Throwable e) { onFatalError(e); @@ -358,7 +389,7 @@ public class ZookeeperDiscoveryImpl { if (stat == null) { try { - onPreviousCoordinatorFail(locInternalId0); + onPreviousNodeFail(crdInternalId, locInternalId0); } catch (Throwable e) { onFatalError(e); @@ -373,11 +404,23 @@ public class ZookeeperDiscoveryImpl { } } - private void onPreviousCoordinatorFail(int locInternalId) throws Exception { - if (log.isInfoEnabled()) - log.info("Previous discovery coordinator failed [locId=" + locNode.id() + ']'); + private void onPreviousNodeFail(int crdInternalId, int locInternalId) throws Exception { + if (locInternalId == crdInternalId + 1) { + if (log.isInfoEnabled()) + log.info("Previous discovery coordinator failed [locId=" + locNode.id() + ']'); + + onBecomeCoordinator(locInternalId); + } + else { + if (log.isInfoEnabled()) + log.info("Previous node failed, check is node new coordinator [locId=" + locNode.id() + ']'); - onBecomeCoordinator(locInternalId); + zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { + @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + checkIsCoordinator(rc, children); + } + }); + } } private void onBecomeCoordinator(int locInternalId) throws Exception { @@ -408,6 +451,9 @@ public class ZookeeperDiscoveryImpl { private void generateTopologyEvents(List<String> aliveNodes) throws Exception { assert crd; + if (log.isInfoEnabled()) + log.info("Process alive nodes change: " + aliveNodes); + TreeMap<Integer, String> alives = new TreeMap<>(); TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(top.nodesByOrder); @@ -599,12 +645,6 @@ public class ZookeeperDiscoveryImpl { generateTopologyEvents(children); } - /** */ - private long lastProcEvt = -1; - - /** */ - private boolean joined; - private void onEventsUpdate(byte[] data, Stat stat) throws Exception { if (data.length == 0) return; @@ -627,10 +667,6 @@ public class ZookeeperDiscoveryImpl { for (Map.Entry<Long, ZkDiscoveryEventData> e : evts.tailMap(lastProcEvt, false).entrySet()) { ZkDiscoveryEventData evtData = e.getValue(); - if (log.isInfoEnabled()) { - log.info("New discovery event data: " + evtData + ']'); - } - if (!joined) { if (evtData.eventType() != EventType.EVT_NODE_JOINED) continue; @@ -643,6 +679,9 @@ public class ZookeeperDiscoveryImpl { locNode.id().equals(joinedId); if (locJoin) { + if (log.isInfoEnabled()) + log.info("Local join event data: " + evtData + ']'); + String path = zkPaths.evtsPath + "/" + evtData.topologyVersion() + "/joined"; ZkJoinEventDataForJoined dataForJoined = unmarshal(zkClient.getData(path)); @@ -680,6 +719,9 @@ public class ZookeeperDiscoveryImpl { } } else { + if (log.isInfoEnabled()) + log.info("New discovery event data: " + evtData + ']'); + switch (evtData.eventType()) { case EventType.EVT_NODE_JOINED: { ZkDiscoveryNodeJoinEventData evtData0 = (ZkDiscoveryNodeJoinEventData)evtData; @@ -728,6 +770,7 @@ public class ZookeeperDiscoveryImpl { * @param evtData Event data. * @param joiningData Joining node data. */ + @SuppressWarnings("unchecked") private void notifyNodeJoin(ZkDiscoveryNodeJoinEventData evtData, ZkJoiningNodeData joiningData) { ZookeeperClusterNode joinedNode = joiningData.node(); @@ -770,6 +813,8 @@ public class ZookeeperDiscoveryImpl { public void stop() { if (zkClient != null) zkClient.close(); + + joinFut.onDone(new IgniteSpiException("Node stopped")); } /** @@ -813,8 +858,21 @@ public class ZookeeperDiscoveryImpl { private class ConnectionLossListener implements IgniteRunnable { /** {@inheritDoc} */ @Override public void run() { - // TODO ZK + // TODO ZK, can be called from any thread. U.warn(log, "Zookeeper connection loss, local node is SEGMENTED"); + + if (joined) { + assert evts != null; + + lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED, + evts.topVer, + locNode, + Collections.<ClusterNode>emptyList(), + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } + else + joinFut.onDone(new IgniteSpiException("Local node SEGMENTED")); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 0054641..0b9c2e4 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.zk; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; @@ -209,57 +211,20 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * - */ - private void reset() { - System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); - - ZkTestClientCnxnSocketNIO.reset(); - - System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); - - err = false; - - evts.clear(); - } - - /** * @throws Exception If failed. */ - private void checkEventsConsistency() throws Exception { - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { - UUID nodeId = nodeEvtEntry.getKey(); - Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); + public void testStopNode_1() throws Exception { + startGrids(5); - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { - if (!nodeId.equals(nodeEvtEntry0.getKey())) { - Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); + waitForTopology(5); - synchronized (nodeEvts) { - synchronized (nodeEvts0) { - checkEventsConsistency(nodeEvts, nodeEvts0); - } - } - } - } - } - } + stopGrid(3); - /** - * @param evts1 Received events. - * @param evts2 Received events. - */ - private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) { - for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) { - DiscoveryEvent evt1 = e1.getValue(); - DiscoveryEvent evt2 = evts2.get(e1.getKey()); + waitForTopology(4); - if (evt2 != null) { - assertEquals(evt1.topologyVersion(), evt2.topologyVersion()); - assertEquals(evt1.eventNode(), evt2.eventNode()); - assertEquals(evt1.topologyNodes(), evt2.topologyNodes()); - } - } + startGrid(3); + + waitForTopology(5); } /** @@ -391,20 +356,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * @param spi Spi instance. - */ - private void closeZkClient(ZookeeperDiscoverySpi spi) { - ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl.zkClient.zk"); - - try { - zk.close(); - } - catch (Exception e) { - fail("Unexpected error: " + e); - } - } - - /** * @throws Exception If failed. */ public void testConnectionRestore_Coordinator1() throws Exception { @@ -433,8 +384,16 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator4() throws Exception { + connectionRestore_Coordinator(3, 3, 1); + } + + /** * @param initNodes Number of initially started nodes. * @param startNodes Number of nodes to start after coordinator loose connection. + * @param failCnt Number of nodes to stop after coordinator loose connection. * @throws Exception If failed. */ private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception { @@ -464,107 +423,50 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { int cnt = 0; - for (int i = initNodes; i < initNodes + startNodes; i++) { - ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); + DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt]; - spi.waitConnectStart(); + int expEvtCnt = 0; - if (cnt < failCnt) - closeZkClient(spi); - } + sesTimeout = 1000; - c0.allowConnect(); + List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>(); - DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes]; + for (int i = initNodes; i < initNodes + startNodes; i++) { + ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); - for (int i = 0; i < startNodes; i++) - expEvts[i] = joinEvent(initNodes + i + 1); + ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, "impl"); - for (int i = 0; i < initNodes; i++) - checkEvents(ignite(i), expEvts); + impl.waitConnectStart(); - fut.get(); + if (cnt++ < failCnt) { + ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i)); - waitForTopology(initNodes + startNodes - failCnt); - } + c.closeSocket(true); - /** - * @param nodeName Node name. - * @return Node's discovery SPI. - * @throws Exception If failed. - */ - private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception { - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return spis.contains(nodeName); + blockedC.add(c); } - }, 5000); - - ZookeeperDiscoverySpi spi = spis.get(nodeName); - - assertNotNull("Failed to get SPI for node: " + nodeName, spi); - - return spi; - } - - private static DiscoveryEvent joinEvent(long topVer) { - DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); - - expEvt.topologySnapshot(topVer, null); - - return expEvt; - } - - private static DiscoveryEvent failEvent(long topVer) { - DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); - - expEvt.topologySnapshot(topVer, null); - - return expEvt; - } - - /** - * @param node Node. - * @param expEvts Expected events. - * @throws Exception If fialed. - */ - private void checkEvents(final Ignite node, final DiscoveryEvent...expEvts) throws Exception { - checkEvents(node.cluster().localNode().id(), expEvts); - } + else { + expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1); - /** - * @param nodeId Node ID. - * @param expEvts Expected events. - * @throws Exception If failed. - */ - private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); - - if (nodeEvts == null) { - info("No events for node: " + nodeId); + expEvtCnt++; + } + } - return false; - } + Thread.sleep(5000); - synchronized (nodeEvts) { - for (DiscoveryEvent expEvt : expEvts) { - DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion()); + c0.allowConnect(); - if (evt0 == null) { - info("No event for version: " + expEvt.topologyVersion()); + for (ZkTestClientCnxnSocketNIO c : blockedC) + c.allowConnect(); - return false; - } + if (expEvts.length > 0) { + for (int i = 0; i < initNodes; i++) + checkEvents(ignite(i), expEvts); + } - assertEquals(expEvt.type(), evt0.type()); - } - } + fut.get(); - return true; - } - }, 10000)); + waitForTopology(initNodes + startNodes - failCnt); } /** @@ -733,6 +635,153 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * + */ + private void reset() { + System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); + + ZkTestClientCnxnSocketNIO.reset(); + + System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); + + err = false; + + evts.clear(); + } + + /** + * @throws Exception If failed. + */ + private void checkEventsConsistency() throws Exception { + for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { + UUID nodeId = nodeEvtEntry.getKey(); + Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); + + for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { + if (!nodeId.equals(nodeEvtEntry0.getKey())) { + Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); + + synchronized (nodeEvts) { + synchronized (nodeEvts0) { + checkEventsConsistency(nodeEvts, nodeEvts0); + } + } + } + } + } + } + + /** + * @param evts1 Received events. + * @param evts2 Received events. + */ + private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) { + for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) { + DiscoveryEvent evt1 = e1.getValue(); + DiscoveryEvent evt2 = evts2.get(e1.getKey()); + + if (evt2 != null) { + assertEquals(evt1.topologyVersion(), evt2.topologyVersion()); + assertEquals(evt1.eventNode(), evt2.eventNode()); + assertEquals(evt1.topologyNodes(), evt2.topologyNodes()); + } + } + } + + /** + * @param nodeName Node name. + * @return Node's discovery SPI. + * @throws Exception If failed. + */ + private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return spis.contains(nodeName); + } + }, 5000); + + ZookeeperDiscoverySpi spi = spis.get(nodeName); + + assertNotNull("Failed to get SPI for node: " + nodeName, spi); + + return spi; + } + + private static DiscoveryEvent joinEvent(long topVer) { + DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); + + expEvt.topologySnapshot(topVer, null); + + return expEvt; + } + + private static DiscoveryEvent failEvent(long topVer) { + DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); + + expEvt.topologySnapshot(topVer, null); + + return expEvt; + } + + /** + * @param node Node. + * @param expEvts Expected events. + * @throws Exception If fialed. + */ + private void checkEvents(final Ignite node, final DiscoveryEvent...expEvts) throws Exception { + checkEvents(node.cluster().localNode().id(), expEvts); + } + + /** + * @param nodeId Node ID. + * @param expEvts Expected events. + * @throws Exception If failed. + */ + private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); + + if (nodeEvts == null) { + info("No events for node: " + nodeId); + + return false; + } + + synchronized (nodeEvts) { + for (DiscoveryEvent expEvt : expEvts) { + DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion()); + + if (evt0 == null) { + info("No event for version: " + expEvt.topologyVersion()); + + return false; + } + + assertEquals(expEvt.type(), evt0.type()); + } + } + + return true; + } + }, 10000)); + } + + /** + * @param spi Spi instance. + */ + private void closeZkClient(ZookeeperDiscoverySpi spi) { + ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", "zk"); + + try { + zk.close(); + } + catch (Exception e) { + fail("Unexpected error: " + e); + } + } + + /** * @param expSize Expected nodes number. * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java index 4a11c68..c8886af 100644 --- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java +++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java @@ -59,6 +59,14 @@ public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { } /** + * @param instanceName Ignite instance name. + * @return ZK client. + */ + public static ZkTestClientCnxnSocketNIO forNode(String instanceName) { + return clients.get(instanceName); + } + + /** * @throws IOException If failed. */ public ZkTestClientCnxnSocketNIO() throws IOException { @@ -69,8 +77,6 @@ public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { nodeName = threadName.substring(threadName.indexOf('-') + 1); log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName); - - clients.put(nodeName, this); } /** {@inheritDoc} */ @@ -93,6 +99,8 @@ public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { } super.connect(addr); + + clients.put(nodeName, this); } /**
