zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ed2564a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ed2564a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ed2564a

Branch: refs/heads/ignite-zk
Commit: 6ed2564a8d68e651cb776e13302d62f415938bea
Parents: 9970b95
Author: sboikov <[email protected]>
Authored: Mon Nov 13 14:24:54 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 13 14:24:54 2017 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  | 18 ++++----
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 47 ++++++++++++++------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 36 ++++++++++++++-
 3 files changed, 77 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 49425ce..04683ac 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -478,19 +478,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                 if (rmtNode == null) {
                     DiscoverySpi discoverySpi = 
ignite().configuration().getDiscoverySpi();
 
-                    assert discoverySpi instanceof TcpDiscoverySpi;
-
-                    TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) 
discoverySpi;
+                    boolean unknownNode = true;
 
-                    ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
+                    if (discoverySpi instanceof TcpDiscoverySpi) {
+                        TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) 
discoverySpi;
 
-                    boolean unknownNode = true;
+                        ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
 
-                    if (node0 != null) {
-                        assert node0.isClient() : node0;
+                        if (node0 != null) {
+                            assert node0.isClient() : node0;
 
-                        if 
(node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
-                            unknownNode = false;
+                            if 
(node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
+                                unknownNode = false;
+                        }
                     }
 
                     if (unknownNode) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index f36a6e2..85b0aa5 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
@@ -36,6 +37,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -415,9 +417,16 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
             List<OpResult> res = zk.multi(joinOps);
 
-            log.info("Waiting for local join event.");
+            log.info("Waiting for local join event [nodeId=" + locNode.id() + 
", name=" + igniteInstanceName + ']');
+
+            for(;;) {
+                if (!joinLatch.await(10, TimeUnit.SECONDS)) {
+                    U.warn(log, "Waiting for local join event [nodeId=" + 
locNode.id() + ", name=" + igniteInstanceName + ']');
+                }
+                else
+                    break;
+            }
 
-            joinLatch.await();
         }
         catch (Exception e) {
             throw new IgniteSpiException(e);
@@ -572,13 +581,15 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
                     if (joinHist.put(data.order, data) == null) {
                         try {
+                            log.info("New joined node data: " + data);
+
                             byte[] bytes = zk.getData(path + "/" + child, 
null, null);
 
                             assert bytes.length > 0;
 
                             ZKJoiningNodeData joinData = unmarshal(bytes);
 
-                            assert joinData.node != null;
+                            assert joinData.node != null && 
joinData.joiningNodeData != null : joinData;
 
                             joinData.node.order(data.order);
 
@@ -697,6 +708,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
                         joinData.node,
                         new ArrayList<>(curTop.values()));
 
+                    log.info("ZK event [type=JOIN, node=" + joinData.node.id() 
+ ", ver=" + v + ']');
+
                     if (!joinData.node.id().equals(locNode.nodeId)) {
                         DiscoveryDataBag joiningNodeBag = new 
DiscoveryDataBag(joinData.node.id());
 
@@ -720,6 +733,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
 
                         assert failedNode != null : data.order;
 
+                        log.info("ZK event [type=FAIL, node=" + 
failedNode.id() + ", ver=" + v + ']');
+
                         evts.put(v, new 
ZKDiscoveryEvent(EventType.EVT_NODE_FAILED,
                             v,
                             failedNode,
@@ -735,6 +750,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
 
                             assert failedNode != null : oldData.order;
 
+                            log.info("ZK event [type=FAIL, node=" + 
failedNode.id() + ", ver=" + v + ']');
+
                             evts.put(v, new 
ZKDiscoveryEvent(EventType.EVT_NODE_FAILED,
                                 v,
                                 failedNode,
@@ -747,7 +764,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
             }
         }
 
-        log.info("Generated discovery events on coordinator: " + evts);
+        log.info("Generated discovery events on coordinator [vers=" + 
evts.keySet() + ", evts=" + evts + ']');
 
         ZKDiscoveryEvents newEvents;
 
@@ -761,9 +778,13 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         else {
             TreeMap<Integer, ZKDiscoveryEvent> evts0 = new 
TreeMap<>(curCrdEvts.evts);
 
-            evts0.putAll(evts);
+            for (ZKDiscoveryEvent e : evts.values()) {
+                assert !evts0.containsKey(e.topVer) : "[newEvt=" + e + ", 
oldEvt=" + evts0.get(e.topVer) + ']';
 
-            newEvents = new ZKDiscoveryEvents(newNodes, evts);
+                evts0.put(e.topVer, e);
+            }
+
+            newEvents = new ZKDiscoveryEvents(newNodes, evts0);
 
             expVer = curCrdEvts.ver;
         }
@@ -774,9 +795,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
             zk.setData(EVENTS_PATH, marshal(newEvents), expVer);
         }
         catch (Exception e) {
-            log.info("Events update error: " + e);
-
-            e.printStackTrace(System.out);
+            U.error(log, "Events update error: " + e, e);
         }
 
         curCrdEvts = newEvents;
@@ -953,19 +972,19 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     static class ZKDiscoveryEvent implements Serializable {
         /** */
         @GridToStringInclude
-        final int evtType;
+        final int topVer;
 
         /** */
         @GridToStringInclude
-        final ZookeeperClusterNode node;
+        final int evtType;
 
         /** */
         @GridToStringInclude
-        final List<ZookeeperClusterNode> allNodes;
+        final ZookeeperClusterNode node;
 
         /** */
-        @GridToStringInclude
-        final int topVer;
+        @GridToStringExclude
+        final List<ZookeeperClusterNode> allNodes;
 
         /** */
         Map<Integer, Serializable> joiningNodeData;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 6e6c528..5e1a200 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -42,6 +42,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /** */
     private static final boolean USE_TEST_CLUSTER = true;
 
+    /** */
+    private boolean client;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -68,6 +71,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         cfg.setMarshaller(new JdkMarshaller());
 
+        cfg.setClientMode(client);
+
         return cfg;
     }
 
@@ -155,7 +160,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         for (Ignite node : G.allGrids())
             node.compute().broadcast(new DummyCallable(null));
 
-        awaitPartitionMapExchange();
+        //awaitPartitionMapExchange();
     }
 
     /**
@@ -194,6 +199,35 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testStartStopWithClients() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        client = true;
+
+        final int THREADS = 30;
+
+        for (int i = 0; i < 5; i++) {
+            info("Iteration: " + i);
+
+            startGridsMultiThreaded(SRVS, THREADS);
+
+            waitForTopology(SRVS + THREADS);
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    stopGrid(idx + 3);
+                }
+            }, THREADS, "stop-node");
+
+            waitForTopology(SRVS);
+        }
+    }
+
+    /**
      * @param expSize Expected nodes number.
      * @throws Exception If failed.
      */

Reply via email to