Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 45e7e4060 -> a4be5afd0


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4be5afd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4be5afd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4be5afd

Branch: refs/heads/ignite-zk
Commit: a4be5afd03fef3b3fa8ce8c017d24636859c82f3
Parents: 45e7e40
Author: sboikov <[email protected]>
Authored: Mon Nov 20 13:27:23 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 20 15:30:19 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  10 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  98 ++++--
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 335 +++++++++++--------
 .../zookeeper/ZkTestClientCnxnSocketNIO.java    |  12 +-
 4 files changed, 281 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 92c4f54..75f4f36 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.discovery.JoiningNodesAware;
@@ -258,15 +259,6 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     }
 
     /**
-     * For testing only.
-     *
-     * @throws Exception If failed.
-     */
-    public void waitConnectStart() throws Exception {
-        //connectStart.await();
-    }
-
-    /**
      * @return Local node instance.
      */
     private ZookeeperClusterNode initLocalNode() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 4378c6f..9689762 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import org.apache.curator.utils.PathUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -97,6 +98,12 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private long gridStartTime;
 
+    /** */
+    private long lastProcEvt = -1;
+
+    /** */
+    private boolean joined;
+
     /**
      * @param log
      * @param basePath
@@ -284,15 +291,29 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
AsyncCallback.Children2Callback() {
                 @Override public void processResult(int rc, String path, 
Object ctx, List<String> children, Stat stat) {
-                    nodeConnected(rc, children);
+                    onConnected(rc, children);
                 }
             });
+
+            connStartLatch.countDown();
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
         }
     }
 
+    /** TODO ZK */
+    private final CountDownLatch connStartLatch = new CountDownLatch(1);
+
+    /**
+     * For testing only.
+     *
+     * @throws Exception If failed.
+     */
+    public void waitConnectStart() throws Exception {
+        connStartLatch.await();
+    }
+
     /** */
     private ZkDiscoveryEventsData evts;
 
@@ -303,10 +324,15 @@ public class ZookeeperDiscoveryImpl {
      * @param rc Async callback result.
      * @param aliveNodes Alive nodes.
      */
-    private void nodeConnected(int rc, List<String> aliveNodes) {
+    private void onConnected(int rc, List<String> aliveNodes) {
+        assert !joined;
+
+        checkIsCoordinator(rc, aliveNodes);
+    }
+
+    private void checkIsCoordinator(int rc, List<String> aliveNodes) {
         try {
             assert rc == 0 : rc;
-            assert !joined;
 
             TreeMap<Integer, String> alives = new TreeMap<>();
 
@@ -339,13 +365,18 @@ public class ZookeeperDiscoveryImpl {
 
                 assert prevE != null;
 
+                final int crdInternalId = crdE.getKey();
                 final int locInternalId0 = locInternalId;
 
+                log.info("Discovery coordinator already exists, watch for 
previous node [" +
+                    "locId=" + locNode.id() +
+                    ", prevPath=" + prevE.getValue() + ']');
+
                 zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), new Watcher() {
                     @Override public void process(WatchedEvent evt) {
                         if (evt.getType() == Event.EventType.NodeDeleted) {
                             try {
-                                onPreviousCoordinatorFail(locInternalId0);
+                                onPreviousNodeFail(crdInternalId, 
locInternalId0);
                             }
                             catch (Throwable e) {
                                 onFatalError(e);
@@ -358,7 +389,7 @@ public class ZookeeperDiscoveryImpl {
 
                         if (stat == null) {
                             try {
-                                onPreviousCoordinatorFail(locInternalId0);
+                                onPreviousNodeFail(crdInternalId, 
locInternalId0);
                             }
                             catch (Throwable e) {
                                 onFatalError(e);
@@ -373,11 +404,23 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    private void onPreviousCoordinatorFail(int locInternalId) throws Exception 
{
-        if (log.isInfoEnabled())
-            log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
+    private void onPreviousNodeFail(int crdInternalId, int locInternalId) 
throws Exception {
+        if (locInternalId == crdInternalId + 1) {
+            if (log.isInfoEnabled())
+                log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
+
+            onBecomeCoordinator(locInternalId);
+        }
+        else {
+            if (log.isInfoEnabled())
+                log.info("Previous node failed, check is node new coordinator 
[locId=" + locNode.id() + ']');
 
-        onBecomeCoordinator(locInternalId);
+            zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
AsyncCallback.Children2Callback() {
+                @Override public void processResult(int rc, String path, 
Object ctx, List<String> children, Stat stat) {
+                    checkIsCoordinator(rc, children);
+                }
+            });
+        }
     }
 
     private void onBecomeCoordinator(int locInternalId) throws Exception {
@@ -408,6 +451,9 @@ public class ZookeeperDiscoveryImpl {
     private void generateTopologyEvents(List<String> aliveNodes) throws 
Exception {
         assert crd;
 
+        if (log.isInfoEnabled())
+            log.info("Process alive nodes change: " + aliveNodes);
+
         TreeMap<Integer, String> alives = new TreeMap<>();
 
         TreeMap<Long, ZookeeperClusterNode> curTop = new 
TreeMap<>(top.nodesByOrder);
@@ -599,12 +645,6 @@ public class ZookeeperDiscoveryImpl {
         generateTopologyEvents(children);
     }
 
-    /** */
-    private long lastProcEvt = -1;
-
-    /** */
-    private boolean joined;
-
     private void onEventsUpdate(byte[] data, Stat stat) throws Exception {
         if (data.length == 0)
             return;
@@ -627,10 +667,6 @@ public class ZookeeperDiscoveryImpl {
         for (Map.Entry<Long, ZkDiscoveryEventData> e : 
evts.tailMap(lastProcEvt, false).entrySet()) {
             ZkDiscoveryEventData evtData = e.getValue();
 
-            if (log.isInfoEnabled()) {
-                log.info("New discovery event data: " + evtData + ']');
-            }
-
             if (!joined) {
                 if (evtData.eventType() != EventType.EVT_NODE_JOINED)
                     continue;
@@ -643,6 +679,9 @@ public class ZookeeperDiscoveryImpl {
                     locNode.id().equals(joinedId);
 
                 if (locJoin) {
+                    if (log.isInfoEnabled())
+                        log.info("Local join event data: " + evtData + ']');
+
                     String path = zkPaths.evtsPath + "/" + 
evtData.topologyVersion() + "/joined";
 
                     ZkJoinEventDataForJoined dataForJoined = 
unmarshal(zkClient.getData(path));
@@ -680,6 +719,9 @@ public class ZookeeperDiscoveryImpl {
                 }
             }
             else {
+                if (log.isInfoEnabled())
+                    log.info("New discovery event data: " + evtData + ']');
+
                 switch (evtData.eventType()) {
                     case EventType.EVT_NODE_JOINED: {
                         ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
@@ -728,6 +770,7 @@ public class ZookeeperDiscoveryImpl {
      * @param evtData Event data.
      * @param joiningData Joining node data.
      */
+    @SuppressWarnings("unchecked")
     private void notifyNodeJoin(ZkDiscoveryNodeJoinEventData evtData, 
ZkJoiningNodeData joiningData) {
         ZookeeperClusterNode joinedNode = joiningData.node();
 
@@ -770,6 +813,8 @@ public class ZookeeperDiscoveryImpl {
     public void stop() {
         if (zkClient != null)
             zkClient.close();
+
+        joinFut.onDone(new IgniteSpiException("Node stopped"));
     }
 
     /**
@@ -813,8 +858,21 @@ public class ZookeeperDiscoveryImpl {
     private class ConnectionLossListener implements IgniteRunnable {
         /** {@inheritDoc} */
         @Override public void run() {
-            // TODO ZK
+            // TODO ZK, can be called from any thread.
             U.warn(log, "Zookeeper connection loss, local node is SEGMENTED");
+
+            if (joined) {
+                assert evts != null;
+
+                lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+                    evts.topVer,
+                    locNode,
+                    Collections.<ClusterNode>emptyList(),
+                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                    null);
+            }
+            else
+                joinFut.onDone(new IgniteSpiException("Local node SEGMENTED"));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 0054641..0b9c2e4 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.zk;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
@@ -209,57 +211,20 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     *
-     */
-    private void reset() {
-        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
-
-        ZkTestClientCnxnSocketNIO.reset();
-
-        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
-
-        err = false;
-
-        evts.clear();
-    }
-
-    /**
      * @throws Exception If failed.
      */
-    private void checkEventsConsistency() throws Exception {
-        for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : 
evts.entrySet()) {
-            UUID nodeId = nodeEvtEntry.getKey();
-            Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue();
+    public void testStopNode_1() throws Exception {
+        startGrids(5);
 
-            for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : 
evts.entrySet()) {
-                if (!nodeId.equals(nodeEvtEntry0.getKey())) {
-                    Map<Long, DiscoveryEvent> nodeEvts0 = 
nodeEvtEntry0.getValue();
+        waitForTopology(5);
 
-                    synchronized (nodeEvts) {
-                        synchronized (nodeEvts0) {
-                            checkEventsConsistency(nodeEvts, nodeEvts0);
-                        }
-                    }
-                }
-            }
-        }
-    }
+        stopGrid(3);
 
-    /**
-     * @param evts1 Received events.
-     * @param evts2 Received events.
-     */
-    private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, 
Map<Long, DiscoveryEvent> evts2) {
-        for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) {
-            DiscoveryEvent evt1 = e1.getValue();
-            DiscoveryEvent evt2 = evts2.get(e1.getKey());
+        waitForTopology(4);
 
-            if (evt2 != null) {
-                assertEquals(evt1.topologyVersion(), evt2.topologyVersion());
-                assertEquals(evt1.eventNode(), evt2.eventNode());
-                assertEquals(evt1.topologyNodes(), evt2.topologyNodes());
-            }
-        }
+        startGrid(3);
+
+        waitForTopology(5);
     }
 
     /**
@@ -391,20 +356,6 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * @param spi Spi instance.
-     */
-    private void closeZkClient(ZookeeperDiscoverySpi spi) {
-        ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl.zkClient.zk");
-
-        try {
-            zk.close();
-        }
-        catch (Exception e) {
-            fail("Unexpected error: " + e);
-        }
-    }
-
-    /**
      * @throws Exception If failed.
      */
     public void testConnectionRestore_Coordinator1() throws Exception {
@@ -433,8 +384,16 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator4() throws Exception {
+        connectionRestore_Coordinator(3, 3, 1);
+    }
+
+    /**
      * @param initNodes Number of initially started nodes.
      * @param startNodes Number of nodes to start after coordinator loose 
connection.
+     * @param failCnt Number of nodes to stop after coordinator loose 
connection.
      * @throws Exception If failed.
      */
     private void connectionRestore_Coordinator(int initNodes, int startNodes, 
int failCnt) throws Exception {
@@ -464,107 +423,50 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         int cnt = 0;
 
-        for (int i = initNodes; i < initNodes + startNodes; i++) {
-            ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
+        DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt];
 
-            spi.waitConnectStart();
+        int expEvtCnt = 0;
 
-            if (cnt < failCnt)
-                closeZkClient(spi);
-        }
+        sesTimeout = 1000;
 
-        c0.allowConnect();
+        List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>();
 
-        DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes];
+        for (int i = initNodes; i < initNodes + startNodes; i++) {
+            ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
 
-        for (int i = 0; i < startNodes; i++)
-            expEvts[i] = joinEvent(initNodes + i + 1);
+            ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, 
"impl");
 
-        for (int i = 0; i < initNodes; i++)
-            checkEvents(ignite(i), expEvts);
+            impl.waitConnectStart();
 
-        fut.get();
+            if (cnt++ < failCnt) {
+                ZkTestClientCnxnSocketNIO c = 
ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i));
 
-        waitForTopology(initNodes + startNodes - failCnt);
-    }
+                c.closeSocket(true);
 
-    /**
-     * @param nodeName Node name.
-     * @return Node's discovery SPI.
-     * @throws Exception If failed.
-     */
-    private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws 
Exception {
-        GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return spis.contains(nodeName);
+                blockedC.add(c);
             }
-        }, 5000);
-
-        ZookeeperDiscoverySpi spi = spis.get(nodeName);
-
-        assertNotNull("Failed to get SPI for node: " + nodeName, spi);
-
-        return spi;
-    }
-
-    private static DiscoveryEvent joinEvent(long topVer) {
-        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_JOINED, null);
-
-        expEvt.topologySnapshot(topVer, null);
-
-        return expEvt;
-    }
-
-    private static DiscoveryEvent failEvent(long topVer) {
-        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_FAILED, null);
-
-        expEvt.topologySnapshot(topVer, null);
-
-        return expEvt;
-    }
-
-    /**
-     * @param node Node.
-     * @param expEvts Expected events.
-     * @throws Exception If fialed.
-     */
-    private void checkEvents(final Ignite node, final 
DiscoveryEvent...expEvts) throws Exception {
-        checkEvents(node.cluster().localNode().id(), expEvts);
-    }
+            else {
+                expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
 
-    /**
-     * @param nodeId Node ID.
-     * @param expEvts Expected events.
-     * @throws Exception If failed.
-     */
-    private void checkEvents(final UUID nodeId, final 
DiscoveryEvent...expEvts) throws Exception {
-        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId);
-
-                if (nodeEvts == null) {
-                    info("No events for node: " + nodeId);
+                expEvtCnt++;
+            }
+        }
 
-                    return false;
-                }
+        Thread.sleep(5000);
 
-                synchronized (nodeEvts) {
-                    for (DiscoveryEvent expEvt : expEvts) {
-                        DiscoveryEvent evt0 = 
nodeEvts.get(expEvt.topologyVersion());
+        c0.allowConnect();
 
-                        if (evt0 == null) {
-                            info("No event for version: " + 
expEvt.topologyVersion());
+        for (ZkTestClientCnxnSocketNIO c : blockedC)
+            c.allowConnect();
 
-                            return false;
-                        }
+        if (expEvts.length > 0) {
+            for (int i = 0; i < initNodes; i++)
+                checkEvents(ignite(i), expEvts);
+        }
 
-                        assertEquals(expEvt.type(), evt0.type());
-                    }
-                }
+        fut.get();
 
-                return true;
-            }
-        }, 10000));
+        waitForTopology(initNodes + startNodes - failCnt);
     }
 
     /**
@@ -733,6 +635,153 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    private void reset() {
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        ZkTestClientCnxnSocketNIO.reset();
+
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+
+        err = false;
+
+        evts.clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkEventsConsistency() throws Exception {
+        for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : 
evts.entrySet()) {
+            UUID nodeId = nodeEvtEntry.getKey();
+            Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue();
+
+            for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : 
evts.entrySet()) {
+                if (!nodeId.equals(nodeEvtEntry0.getKey())) {
+                    Map<Long, DiscoveryEvent> nodeEvts0 = 
nodeEvtEntry0.getValue();
+
+                    synchronized (nodeEvts) {
+                        synchronized (nodeEvts0) {
+                            checkEventsConsistency(nodeEvts, nodeEvts0);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param evts1 Received events.
+     * @param evts2 Received events.
+     */
+    private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, 
Map<Long, DiscoveryEvent> evts2) {
+        for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) {
+            DiscoveryEvent evt1 = e1.getValue();
+            DiscoveryEvent evt2 = evts2.get(e1.getKey());
+
+            if (evt2 != null) {
+                assertEquals(evt1.topologyVersion(), evt2.topologyVersion());
+                assertEquals(evt1.eventNode(), evt2.eventNode());
+                assertEquals(evt1.topologyNodes(), evt2.topologyNodes());
+            }
+        }
+    }
+
+    /**
+     * @param nodeName Node name.
+     * @return Node's discovery SPI.
+     * @throws Exception If failed.
+     */
+    private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws 
Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return spis.contains(nodeName);
+            }
+        }, 5000);
+
+        ZookeeperDiscoverySpi spi = spis.get(nodeName);
+
+        assertNotNull("Failed to get SPI for node: " + nodeName, spi);
+
+        return spi;
+    }
+
+    private static DiscoveryEvent joinEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_JOINED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    private static DiscoveryEvent failEvent(long topVer) {
+        DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_FAILED, null);
+
+        expEvt.topologySnapshot(topVer, null);
+
+        return expEvt;
+    }
+
+    /**
+     * @param node Node.
+     * @param expEvts Expected events.
+     * @throws Exception If fialed.
+     */
+    private void checkEvents(final Ignite node, final 
DiscoveryEvent...expEvts) throws Exception {
+        checkEvents(node.cluster().localNode().id(), expEvts);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param expEvts Expected events.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(final UUID nodeId, final 
DiscoveryEvent...expEvts) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId);
+
+                if (nodeEvts == null) {
+                    info("No events for node: " + nodeId);
+
+                    return false;
+                }
+
+                synchronized (nodeEvts) {
+                    for (DiscoveryEvent expEvt : expEvts) {
+                        DiscoveryEvent evt0 = 
nodeEvts.get(expEvt.topologyVersion());
+
+                        if (evt0 == null) {
+                            info("No event for version: " + 
expEvt.topologyVersion());
+
+                            return false;
+                        }
+
+                        assertEquals(expEvt.type(), evt0.type());
+                    }
+                }
+
+                return true;
+            }
+        }, 10000));
+    }
+
+    /**
+     * @param spi Spi instance.
+     */
+    private void closeZkClient(ZookeeperDiscoverySpi spi) {
+        ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", 
"zk");
+
+        try {
+            zk.close();
+        }
+        catch (Exception e) {
+            fail("Unexpected error: " + e);
+        }
+    }
+
+    /**
      * @param expSize Expected nodes number.
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4be5afd/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
index 4a11c68..c8886af 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
@@ -59,6 +59,14 @@ public class ZkTestClientCnxnSocketNIO extends 
ClientCnxnSocketNIO {
     }
 
     /**
+     * @param instanceName Ignite instance name.
+     * @return ZK client.
+     */
+    public static ZkTestClientCnxnSocketNIO forNode(String instanceName) {
+        return clients.get(instanceName);
+    }
+
+    /**
      * @throws IOException If failed.
      */
     public ZkTestClientCnxnSocketNIO() throws IOException {
@@ -69,8 +77,6 @@ public class ZkTestClientCnxnSocketNIO extends 
ClientCnxnSocketNIO {
         nodeName = threadName.substring(threadName.indexOf('-') + 1);
 
         log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName);
-
-        clients.put(nodeName, this);
     }
 
     /** {@inheritDoc} */
@@ -93,6 +99,8 @@ public class ZkTestClientCnxnSocketNIO extends 
ClientCnxnSocketNIO {
         }
 
         super.connect(addr);
+
+        clients.put(nodeName, this);
     }
 
     /**

Reply via email to