Repository: ignite Updated Branches: refs/heads/ignite-zk d56163c0e -> d1f730789
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1f73078 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1f73078 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1f73078 Branch: refs/heads/ignite-zk Commit: d1f730789ed718f11b0654fb3c4622cf9725b3db Parents: d56163c Author: sboikov <[email protected]> Authored: Fri Nov 10 12:04:54 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 10 12:04:54 2017 +0300 ---------------------------------------------------------------------- .../tcp/ipfinder/zk/ZKClusterNodeNew.java | 33 ++++- .../org/apache/zookeeper/ZKDisconnectTest.java | 132 ----------------- .../org/apache/zookeeper/ZKDisconnectTest1.java | 134 +++++++++++++++++ .../org/apache/zookeeper/ZKDisconnectTest2.java | 142 +++++++++++++++++++ 4 files changed, 303 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java index bc2620e..d2c448c 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java @@ -27,10 +27,12 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.ClientCnxn; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; @@ -39,12 +41,16 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import sun.reflect.generics.tree.Tree; /** * */ public class ZKClusterNodeNew implements Watcher { + private static final Logger LOG = LoggerFactory.getLogger(ZKClusterNodeNew.class); + /** */ private static final String CLUSTER_PATH = "/cluster"; @@ -101,7 +107,7 @@ public class ZKClusterNodeNew implements Watcher { } private void log(String msg) { - System.out.println(nodeName + ": " + msg); + LOG.info(nodeName + ": " + msg); } @Override public void process(WatchedEvent event) { @@ -473,11 +479,15 @@ public class ZKClusterNodeNew implements Watcher { } } - public void join(String connectString) throws Exception { + public void join(String connectString ) throws Exception { + join(connectString, 0); + } + + public void join(String connectString, long timeout) throws Exception { log("Start connect " + connectString); try { - zk = new ZooKeeper(connectString, 5000, this); + zk = new ZooKeeper(connectString, 5_000, this); if (zk.exists(CLUSTER_PATH, false) == null) { List<Op> initOps = new ArrayList<>(); @@ -507,10 +517,21 @@ public class ZKClusterNodeNew implements Watcher { List<OpResult> res = zk.multi(joinOps); - connectLatch.await(); + if (timeout > 0) { + if (!connectLatch.await(timeout, TimeUnit.MILLISECONDS)) { + LOG.info("Connect timed out, start failed."); - System.out.println("Node joined: " + nodeId); - } catch (Exception e) { + zk.close(); + + throw new Exception("Connect timed out, start failed."); + } + } + else + connectLatch.await(); + + log("Node joined: " + nodeId); + } + catch (Exception e) { log("Connect failed: " + e); e.printStackTrace(System.out); http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java deleted file mode 100644 index fdd9ae9..0000000 --- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.util.concurrent.CountDownLatch; -import org.apache.curator.test.TestingCluster; -import org.apache.ignite.spi.discovery.tcp.ipfinder.zk.ZKClusterNode; -import org.apache.ignite.testframework.GridTestUtils; - -import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; - -/** - * - */ -public class ZKDisconnectTest { - public static class TestClientCnxnSocketNIO extends ClientCnxnSocketNIO { - private static TestClientCnxnSocketNIO instance; - - volatile CountDownLatch blockConnect; - - public TestClientCnxnSocketNIO() throws IOException { - super(); - - if (instance == null) - instance = this; - } - - @Override - void connect(InetSocketAddress addr) throws IOException { - System.out.println("TestClientCnxnSocketNIO connect: " + addr); - - CountDownLatch blockConnect = this.blockConnect; - - if (blockConnect != null) { - try { - System.out.println("TestClientCnxnSocketNIO block connected"); - - blockConnect.await(); - - System.out.println("TestClientCnxnSocketNIO finish block"); - } - catch (Exception e) { - e.printStackTrace(); - } - - this.blockConnect = null; - } - - super.connect(addr); - } - - void testClose() { - try { - SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey"); - - k.channel().close(); - } - catch (Throwable e) { - e.printStackTrace(); - } - } - } - - public static void main(String[] args) { - try { - TestingCluster zkCluster = new TestingCluster(1); - zkCluster.start(); - - Thread.sleep(1000); - - System.out.println("ZK started\n"); - - System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, TestClientCnxnSocketNIO.class.getName()); - - ZKClusterNode node1 = new ZKClusterNode("n1"); - node1.join(zkCluster.getConnectString()); - - ZKClusterNode node2 = new ZKClusterNode("n2"); - node2.join(zkCluster.getConnectString()); - - System.out.println("Client connected"); - - Thread.sleep(1000); - - System.out.println("Close channel"); - - TestClientCnxnSocketNIO.instance.blockConnect = new CountDownLatch(1); - TestClientCnxnSocketNIO.instance.testClose(); - - System.out.println("Closed"); - - ZKClusterNode node3 = new ZKClusterNode("n3"); - node3.join(zkCluster.getConnectString()); - - System.out.println("Node started"); - - node3.stop(); - - ZKClusterNode node4 = new ZKClusterNode("n4"); - node4.join(zkCluster.getConnectString()); - - System.out.println("Node stopped"); - - TestClientCnxnSocketNIO.instance.blockConnect.countDown(); - - Thread.sleep(60_000); - } - catch (Throwable e) { - e.printStackTrace(System.out); - - System.exit(1); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java new file mode 100644 index 0000000..c0d2c8f --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest1.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.util.concurrent.CountDownLatch; +import org.apache.curator.test.TestingCluster; +import org.apache.ignite.spi.discovery.tcp.ipfinder.zk.ZKClusterNodeNew; +import org.apache.ignite.testframework.GridTestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; + +/** + * + */ +public class ZKDisconnectTest1 { + /** */ + private static final Logger LOG = LoggerFactory.getLogger(ZKDisconnectTest1.class); + + public static class TestClientCnxnSocketNIO extends ClientCnxnSocketNIO { + private static TestClientCnxnSocketNIO instance; + + volatile CountDownLatch blockConnect; + + public TestClientCnxnSocketNIO() throws IOException { + super(); + + if (instance == null) + instance = this; + } + + @Override + void connect(InetSocketAddress addr) throws IOException { + System.out.println("TestClientCnxnSocketNIO connect: " + addr); + + CountDownLatch blockConnect = this.blockConnect; + + if (blockConnect != null) { + try { + LOG.info("TestClientCnxnSocketNIO block connected"); + + blockConnect.await(); + + LOG.info("TestClientCnxnSocketNIO finish block"); + } + catch (Exception e) { + e.printStackTrace(); + } + + this.blockConnect = null; + } + + super.connect(addr); + } + + void testClose() { + try { + SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey"); + + k.channel().close(); + } + catch (Throwable e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) { + try { + TestingCluster zkCluster = new TestingCluster(1); + zkCluster.start(); + + Thread.sleep(1000); + + LOG.info("ZK started\n"); + + System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, TestClientCnxnSocketNIO.class.getName()); + + ZKClusterNodeNew node1 = new ZKClusterNodeNew("n1"); + node1.join(zkCluster.getConnectString()); + + ZKClusterNodeNew node2 = new ZKClusterNodeNew("n2"); + node2.join(zkCluster.getConnectString()); + + LOG.info("Clients connected"); + + Thread.sleep(3000); + + LOG.info("Close channel"); + + TestClientCnxnSocketNIO.instance.blockConnect = new CountDownLatch(1); + TestClientCnxnSocketNIO.instance.testClose(); + + LOG.info("Closed"); + + ZKClusterNodeNew node3 = new ZKClusterNodeNew("n3"); + node3.join(zkCluster.getConnectString()); + + LOG.info("Node started"); + + node3.stop(); + + LOG.info("Node stopped"); + + TestClientCnxnSocketNIO.instance.blockConnect.countDown(); + + Thread.sleep(60_000); + } + catch (Throwable e) { + e.printStackTrace(System.out); + + System.exit(1); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d1f73078/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java new file mode 100644 index 0000000..3b36e82 --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest2.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import org.apache.curator.test.TestingCluster; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.spi.discovery.tcp.ipfinder.zk.ZKClusterNodeNew; +import org.apache.ignite.testframework.GridTestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; + +/** + * + */ +public class ZKDisconnectTest2 { + /** */ + private static final Logger LOG = LoggerFactory.getLogger(ZKDisconnectTest2.class); + + public static class TestClientCnxnSocketNIO extends ClientCnxnSocketNIO { + private static TestClientCnxnSocketNIO instance; + + volatile CountDownLatch blockConnect; + + public TestClientCnxnSocketNIO() throws IOException { + super(); + + if (instance == null) + instance = this; + } + + @Override + void connect(InetSocketAddress addr) throws IOException { + System.out.println("TestClientCnxnSocketNIO connect: " + addr); + + CountDownLatch blockConnect = this.blockConnect; + + if (blockConnect != null) { + try { + LOG.info("TestClientCnxnSocketNIO block connected"); + + blockConnect.await(); + + LOG.info("TestClientCnxnSocketNIO finish block"); + } + catch (Exception e) { + e.printStackTrace(); + } + + this.blockConnect = null; + } + + super.connect(addr); + } + + void testClose() { + try { + SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey"); + + k.channel().close(); + } + catch (Throwable e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) { + try { + final TestingCluster zkCluster = new TestingCluster(1); + zkCluster.start(); + + Thread.sleep(1000); + + LOG.info("ZK started\n"); + + System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, TestClientCnxnSocketNIO.class.getName()); + + ZKClusterNodeNew node1 = new ZKClusterNodeNew("n1"); + node1.join(zkCluster.getConnectString()); + + ZKClusterNodeNew node2 = new ZKClusterNodeNew("n2"); + node2.join(zkCluster.getConnectString()); + + LOG.info("Clients connected"); + + Thread.sleep(3000); + + LOG.info("Close channel"); + + TestClientCnxnSocketNIO.instance.blockConnect = new CountDownLatch(1); + TestClientCnxnSocketNIO.instance.testClose(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ZKClusterNodeNew node3 = new ZKClusterNodeNew("n3"); + node3.join(zkCluster.getConnectString(), 2000); + + return null; + } + }, "start"); + + Thread.sleep(3000); + + LOG.info("Stop block"); + + TestClientCnxnSocketNIO.instance.blockConnect.countDown(); + + fut.get(); + + LOG.info("Done"); + + Thread.sleep(60_000); + } + catch (Throwable e) { + e.printStackTrace(System.out); + + System.exit(1); + } + } +}
