Repository: ignite
Updated Branches:
  refs/heads/ignite-zk d56163c0e -> d1f730789


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1f73078
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1f73078
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1f73078

Branch: refs/heads/ignite-zk
Commit: d1f730789ed718f11b0654fb3c4622cf9725b3db
Parents: d56163c
Author: sboikov <[email protected]>
Authored: Fri Nov 10 12:04:54 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Nov 10 12:04:54 2017 +0300

----------------------------------------------------------------------
 .../tcp/ipfinder/zk/ZKClusterNodeNew.java       |  33 ++++-
 .../org/apache/zookeeper/ZKDisconnectTest.java  | 132 -----------------
 .../org/apache/zookeeper/ZKDisconnectTest1.java | 134 +++++++++++++++++
 .../org/apache/zookeeper/ZKDisconnectTest2.java | 142 +++++++++++++++++++
 4 files changed, 303 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
index bc2620e..d2c448c 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
@@ -27,10 +27,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
@@ -39,12 +41,16 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import sun.reflect.generics.tree.Tree;
 
 /**
  *
  */
 public class ZKClusterNodeNew implements Watcher {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZKClusterNodeNew.class);
+
     /** */
     private static final String CLUSTER_PATH = "/cluster";
 
@@ -101,7 +107,7 @@ public class ZKClusterNodeNew implements Watcher {
     }
 
     private void log(String msg) {
-        System.out.println(nodeName + ": " + msg);
+        LOG.info(nodeName + ": " + msg);
     }
 
     @Override public void process(WatchedEvent event) {
@@ -473,11 +479,15 @@ public class ZKClusterNodeNew implements Watcher {
         }
     }
 
-    public void join(String connectString) throws Exception {
+    public void join(String connectString ) throws Exception {
+        join(connectString, 0);
+    }
+
+    public void join(String connectString, long timeout) throws Exception {
         log("Start connect " + connectString);
 
         try {
-            zk = new ZooKeeper(connectString, 5000, this);
+            zk = new ZooKeeper(connectString, 5_000, this);
 
             if (zk.exists(CLUSTER_PATH, false) == null) {
                 List<Op> initOps = new ArrayList<>();
@@ -507,10 +517,21 @@ public class ZKClusterNodeNew implements Watcher {
 
             List<OpResult> res = zk.multi(joinOps);
 
-            connectLatch.await();
+            if (timeout > 0) {
+                if (!connectLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+                    LOG.info("Connect timed out, start failed.");
 
-            System.out.println("Node joined: " + nodeId);
-        } catch (Exception e) {
+                    zk.close();
+
+                    throw new Exception("Connect timed out, start failed.");
+                }
+            }
+            else
+                connectLatch.await();
+
+            log("Node joined: " + nodeId);
+        }
+        catch (Exception e) {
             log("Connect failed: " + e);
 
             e.printStackTrace(System.out);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java
deleted file mode 100644
index fdd9ae9..0000000
--- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.util.concurrent.CountDownLatch;
-import org.apache.curator.test.TestingCluster;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.zk.ZKClusterNode;
-import org.apache.ignite.testframework.GridTestUtils;
-
-import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
-
-/**
- *
- */
-public class ZKDisconnectTest {
-    public static class TestClientCnxnSocketNIO extends ClientCnxnSocketNIO {
-        private static TestClientCnxnSocketNIO instance;
-
-        volatile CountDownLatch blockConnect;
-
-        public TestClientCnxnSocketNIO() throws IOException {
-            super();
-
-            if (instance == null)
-                instance = this;
-        }
-
-        @Override
-        void connect(InetSocketAddress addr) throws IOException {
-            System.out.println("TestClientCnxnSocketNIO connect: " + addr);
-
-            CountDownLatch blockConnect = this.blockConnect;
-
-            if (blockConnect != null) {
-                try {
-                    System.out.println("TestClientCnxnSocketNIO block 
connected");
-
-                    blockConnect.await();
-
-                    System.out.println("TestClientCnxnSocketNIO finish block");
-                }
-                catch (Exception e) {
-                    e.printStackTrace();
-                }
-
-                this.blockConnect = null;
-            }
-
-            super.connect(addr);
-        }
-
-        void testClose() {
-            try {
-                SelectionKey k = GridTestUtils.getFieldValue(this, 
ClientCnxnSocketNIO.class, "sockKey");
-
-                k.channel().close();
-            }
-            catch (Throwable e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    public static void main(String[] args) {
-        try {
-            TestingCluster zkCluster = new TestingCluster(1);
-            zkCluster.start();
-
-            Thread.sleep(1000);
-
-            System.out.println("ZK started\n");
-
-            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
TestClientCnxnSocketNIO.class.getName());
-
-            ZKClusterNode node1 = new ZKClusterNode("n1");
-            node1.join(zkCluster.getConnectString());
-
-            ZKClusterNode node2 = new ZKClusterNode("n2");
-            node2.join(zkCluster.getConnectString());
-
-            System.out.println("Client connected");
-
-            Thread.sleep(1000);
-
-            System.out.println("Close channel");
-
-            TestClientCnxnSocketNIO.instance.blockConnect = new 
CountDownLatch(1);
-            TestClientCnxnSocketNIO.instance.testClose();
-
-            System.out.println("Closed");
-
-            ZKClusterNode node3 = new ZKClusterNode("n3");
-            node3.join(zkCluster.getConnectString());
-
-            System.out.println("Node started");
-
-            node3.stop();
-
-            ZKClusterNode node4 = new ZKClusterNode("n4");
-            node4.join(zkCluster.getConnectString());
-
-            System.out.println("Node stopped");
-
-            TestClientCnxnSocketNIO.instance.blockConnect.countDown();
-
-            Thread.sleep(60_000);
-        }
-        catch (Throwable e) {
-            e.printStackTrace(System.out);
-
-            System.exit(1);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java
new file mode 100644
index 0000000..c0d2c8f
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.CountDownLatch;
+import org.apache.curator.test.TestingCluster;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.zk.ZKClusterNodeNew;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ *
+ */
+public class ZKDisconnectTest1 {
+    /** */
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZKDisconnectTest1.class);
+
+    public static class TestClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+        private static TestClientCnxnSocketNIO instance;
+
+        volatile CountDownLatch blockConnect;
+
+        public TestClientCnxnSocketNIO() throws IOException {
+            super();
+
+            if (instance == null)
+                instance = this;
+        }
+
+        @Override
+        void connect(InetSocketAddress addr) throws IOException {
+            System.out.println("TestClientCnxnSocketNIO connect: " + addr);
+
+            CountDownLatch blockConnect = this.blockConnect;
+
+            if (blockConnect != null) {
+                try {
+                    LOG.info("TestClientCnxnSocketNIO block connected");
+
+                    blockConnect.await();
+
+                    LOG.info("TestClientCnxnSocketNIO finish block");
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+                this.blockConnect = null;
+            }
+
+            super.connect(addr);
+        }
+
+        void testClose() {
+            try {
+                SelectionKey k = GridTestUtils.getFieldValue(this, 
ClientCnxnSocketNIO.class, "sockKey");
+
+                k.channel().close();
+            }
+            catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public static void main(String[] args) {
+        try {
+            TestingCluster zkCluster = new TestingCluster(1);
+            zkCluster.start();
+
+            Thread.sleep(1000);
+
+            LOG.info("ZK started\n");
+
+            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
TestClientCnxnSocketNIO.class.getName());
+
+            ZKClusterNodeNew node1 = new ZKClusterNodeNew("n1");
+            node1.join(zkCluster.getConnectString());
+
+            ZKClusterNodeNew node2 = new ZKClusterNodeNew("n2");
+            node2.join(zkCluster.getConnectString());
+
+            LOG.info("Clients connected");
+
+            Thread.sleep(3000);
+
+            LOG.info("Close channel");
+
+            TestClientCnxnSocketNIO.instance.blockConnect = new 
CountDownLatch(1);
+            TestClientCnxnSocketNIO.instance.testClose();
+
+            LOG.info("Closed");
+
+            ZKClusterNodeNew node3 = new ZKClusterNodeNew("n3");
+            node3.join(zkCluster.getConnectString());
+
+            LOG.info("Node started");
+
+            node3.stop();
+
+            LOG.info("Node stopped");
+
+            TestClientCnxnSocketNIO.instance.blockConnect.countDown();
+
+            Thread.sleep(60_000);
+        }
+        catch (Throwable e) {
+            e.printStackTrace(System.out);
+
+            System.exit(1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java
new file mode 100644
index 0000000..3b36e82
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.curator.test.TestingCluster;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.zk.ZKClusterNodeNew;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ *
+ */
+public class ZKDisconnectTest2 {
+    /** */
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZKDisconnectTest2.class);
+
+    public static class TestClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+        private static TestClientCnxnSocketNIO instance;
+
+        volatile CountDownLatch blockConnect;
+
+        public TestClientCnxnSocketNIO() throws IOException {
+            super();
+
+            if (instance == null)
+                instance = this;
+        }
+
+        @Override
+        void connect(InetSocketAddress addr) throws IOException {
+            System.out.println("TestClientCnxnSocketNIO connect: " + addr);
+
+            CountDownLatch blockConnect = this.blockConnect;
+
+            if (blockConnect != null) {
+                try {
+                    LOG.info("TestClientCnxnSocketNIO block connected");
+
+                    blockConnect.await();
+
+                    LOG.info("TestClientCnxnSocketNIO finish block");
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+                this.blockConnect = null;
+            }
+
+            super.connect(addr);
+        }
+
+        void testClose() {
+            try {
+                SelectionKey k = GridTestUtils.getFieldValue(this, 
ClientCnxnSocketNIO.class, "sockKey");
+
+                k.channel().close();
+            }
+            catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public static void main(String[] args) {
+        try {
+            final TestingCluster zkCluster = new TestingCluster(1);
+            zkCluster.start();
+
+            Thread.sleep(1000);
+
+            LOG.info("ZK started\n");
+
+            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
TestClientCnxnSocketNIO.class.getName());
+
+            ZKClusterNodeNew node1 = new ZKClusterNodeNew("n1");
+            node1.join(zkCluster.getConnectString());
+
+            ZKClusterNodeNew node2 = new ZKClusterNodeNew("n2");
+            node2.join(zkCluster.getConnectString());
+
+            LOG.info("Clients connected");
+
+            Thread.sleep(3000);
+
+            LOG.info("Close channel");
+
+            TestClientCnxnSocketNIO.instance.blockConnect = new 
CountDownLatch(1);
+            TestClientCnxnSocketNIO.instance.testClose();
+
+            IgniteInternalFuture fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ZKClusterNodeNew node3 = new ZKClusterNodeNew("n3");
+                    node3.join(zkCluster.getConnectString(), 2000);
+
+                    return null;
+                }
+            }, "start");
+
+            Thread.sleep(3000);
+
+            LOG.info("Stop block");
+
+            TestClientCnxnSocketNIO.instance.blockConnect.countDown();
+
+            fut.get();
+
+            LOG.info("Done");
+
+            Thread.sleep(60_000);
+        }
+        catch (Throwable e) {
+            e.printStackTrace(System.out);
+
+            System.exit(1);
+        }
+    }
+}

Reply via email to