Repository: ignite Updated Branches: refs/heads/ignite-zk 8c07bc892 -> 44a81bbb6
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f89dbcf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f89dbcf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f89dbcf Branch: refs/heads/ignite-zk Commit: 1f89dbcfab77206f8ae0c373012edf04c9503a26 Parents: 5674f7f Author: sboikov <sboi...@gridgain.com> Authored: Fri Jan 12 12:53:02 2018 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jan 12 12:59:42 2018 +0300 ---------------------------------------------------------------------- .../zookeeper/ZkTestClientCnxnSocketNIO.java | 137 ------------------- .../zk/internal/ZookeeperDiscoveryImpl.java | 18 ++- .../zk/ZookeeperDiscoverySpiTestSuite2.java | 1 + .../zk/internal/ZookeeperDiscoverySpiTest.java | 75 ++++------ .../zookeeper/ZkTestClientCnxnSocketNIO.java | 137 +++++++++++++++++++ 5 files changed, 177 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java deleted file mode 100644 index 7892b5e..0000000 --- a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.logger.java.JavaLogger; -import org.apache.ignite.testframework.GridTestUtils; - -/** - * - */ -public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { - /** */ - public static final IgniteLogger log = new JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class); - - /** */ - public static volatile boolean DEBUG = false; - - /** */ - public volatile CountDownLatch blockConnectLatch; - - /** */ - public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients = new ConcurrentHashMap<>(); - - /** */ - private final String nodeName; - - /** - * - */ - public static void reset() { - clients.clear(); - } - - /** - * @param node Node. - * @return ZK client. - */ - public static ZkTestClientCnxnSocketNIO forNode(Ignite node) { - return clients.get(node.name()); - } - - /** - * @param instanceName Ignite instance name. - * @return ZK client. - */ - public static ZkTestClientCnxnSocketNIO forNode(String instanceName) { - return clients.get(instanceName); - } - - /** - * @throws IOException If failed. - */ - public ZkTestClientCnxnSocketNIO() throws IOException { - super(); - - String threadName = Thread.currentThread().getName(); - - nodeName = threadName.substring(threadName.indexOf('-') + 1); - - if (DEBUG) - log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName); - } - - /** {@inheritDoc} */ - @Override void connect(InetSocketAddress addr) throws IOException { - CountDownLatch blockConnect = this.blockConnectLatch; - - if (DEBUG) - log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + ", addr=" + addr + ']'); - - if (blockConnect != null && blockConnect.getCount() > 0) { - try { - log.info("ZkTestClientCnxnSocketNIO block connect"); - - blockConnect.await(60, TimeUnit.SECONDS); - - log.info("ZkTestClientCnxnSocketNIO finish block connect"); - } - catch (Exception e) { - log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e); - } - } - - super.connect(addr); - - clients.put(nodeName, this); - } - - /** - * - */ - public void allowConnect() { - assert blockConnectLatch != null && blockConnectLatch.getCount() == 1 : blockConnectLatch; - - log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + ']'); - - blockConnectLatch.countDown(); - } - - /** - * @param blockConnect {@code True} to block client reconnect. - * @throws Exception If failed. - */ - public void closeSocket(boolean blockConnect) throws Exception { - if (blockConnect) - blockConnectLatch = new CountDownLatch(1); - - log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + ", block=" + blockConnect + ']'); - - SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey"); - - k.channel().close(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 8d404f6..361661b 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -188,6 +188,9 @@ public class ZookeeperDiscoveryImpl { /** */ private final AtomicReference<ZkCommunicationErrorProcessFuture> commErrProcFut = new AtomicReference<>(); + /** */ + private long prevSavedEvtsTopVer; + /** * @param spi Discovery SPI. * @param igniteInstanceName Instance name. @@ -2058,8 +2061,19 @@ public class ZookeeperDiscoveryImpl { long time = System.currentTimeMillis() - start; - if (log.isInfoEnabled()) { - log.info("Discovery coordinator saved new topology events [topVer=" + rtState.evtsData.topVer + + if (prevSavedEvtsTopVer != rtState.evtsData.topVer) { + if (log.isInfoEnabled()) { + log.info("Discovery coordinator saved new topology events [topVer=" + rtState.evtsData.topVer + + ", size=" + evtsBytes.length + + ", evts=" + rtState.evtsData.evts.size() + + ", lastEvt=" + rtState.evtsData.evtIdGen + + ", saveTime=" + time + ']'); + } + + prevSavedEvtsTopVer = rtState.evtsData.topVer; + } + else if (log.isDebugEnabled()) { + log.debug("Discovery coordinator saved new topology events [topVer=" + rtState.evtsData.topVer + ", size=" + evtsBytes.length + ", evts=" + rtState.evtsData.evts.size() + ", lastEvt=" + rtState.evtsData.evtIdGen + http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java index d5749af..f5f395b 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java @@ -67,6 +67,7 @@ public class ZookeeperDiscoverySpiTestSuite2 extends TestSuite { return suite; } + /** * Called via reflection by {@link org.apache.ignite.testframework.junits.GridAbstractTest}. * http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index d18db5a..05b1d7a 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -109,6 +109,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestSuite2; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.zookeeper.KeeperException; @@ -145,7 +146,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** */ private static TestingCluster zkCluster; - /** */ + /** To run test with real local ZK. */ private static final boolean USE_TEST_CLUSTER = true; /** */ @@ -359,50 +360,6 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT, "1000"); } - /** - * @param instances Number of instances. - * @return Cluster. - */ - private static TestingCluster createTestingCluster(int instances) { - String tmpDir = System.getProperty("java.io.tmpdir"); - - List<InstanceSpec> specs = new ArrayList<>(); - - for (int i = 0; i < instances; i++) { - File file = new File(tmpDir, "apacheIgniteTestZk-" + i); - - if (file.isDirectory()) - deleteRecursively0(file); - else { - if (!file.mkdirs()) - throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath()); - } - - specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, 10_000)); - } - - return new TestingCluster(specs); - } - - /** - * @param file Directory to delete. - */ - private static void deleteRecursively0(File file) { - File[] files = file.listFiles(); - - if (files == null) - return; - - for (File f : files) { - if (f.isDirectory()) - deleteRecursively0(f); - else { - if (!f.delete()) - throw new IgniteException("Failed to delete file: " + f.getAbsolutePath()); - } - } - } - /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { stopZkCluster(); @@ -447,7 +404,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { super.beforeTest(); if (USE_TEST_CLUSTER && zkCluster == null) { - zkCluster = createTestingCluster(ZK_SRVS); + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); zkCluster.start(); } @@ -1070,7 +1027,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { assertTrue(l.await(10, TimeUnit.SECONDS)); } finally { - zkCluster = createTestingCluster(ZK_SRVS); + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); zkCluster.start(); } @@ -1107,7 +1064,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { finally { zkCluster.close(); - zkCluster = createTestingCluster(ZK_SRVS); + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); zkCluster.start(); } @@ -1142,7 +1099,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { finally { zkCluster.close(); - zkCluster = createTestingCluster(ZK_SRVS); + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS); zkCluster.start(); } @@ -1866,14 +1823,28 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTopologyChangeMultithreaded_RestartZk() throws Exception { - topologyChangeWithRestarts(true, false); + try { + topologyChangeWithRestarts(true, false); + } + finally { + zkCluster.stop(); + + zkCluster = null; + } } /** * @throws Exception If failed. */ public void testTopologyChangeMultithreaded_RestartZk_CloseClients() throws Exception { - topologyChangeWithRestarts(true, true); + try { + topologyChangeWithRestarts(true, true); + } + finally { + zkCluster.stop(); + + zkCluster = null; + } } /** @@ -3619,7 +3590,7 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { sesTimeout = 30_000; - zkCluster = createTestingCluster(3); + zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(3); try { final AtomicInteger idx = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java new file mode 100644 index 0000000..7892b5e --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.logger.java.JavaLogger; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * + */ +public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { + /** */ + public static final IgniteLogger log = new JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class); + + /** */ + public static volatile boolean DEBUG = false; + + /** */ + public volatile CountDownLatch blockConnectLatch; + + /** */ + public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients = new ConcurrentHashMap<>(); + + /** */ + private final String nodeName; + + /** + * + */ + public static void reset() { + clients.clear(); + } + + /** + * @param node Node. + * @return ZK client. + */ + public static ZkTestClientCnxnSocketNIO forNode(Ignite node) { + return clients.get(node.name()); + } + + /** + * @param instanceName Ignite instance name. + * @return ZK client. + */ + public static ZkTestClientCnxnSocketNIO forNode(String instanceName) { + return clients.get(instanceName); + } + + /** + * @throws IOException If failed. + */ + public ZkTestClientCnxnSocketNIO() throws IOException { + super(); + + String threadName = Thread.currentThread().getName(); + + nodeName = threadName.substring(threadName.indexOf('-') + 1); + + if (DEBUG) + log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName); + } + + /** {@inheritDoc} */ + @Override void connect(InetSocketAddress addr) throws IOException { + CountDownLatch blockConnect = this.blockConnectLatch; + + if (DEBUG) + log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + ", addr=" + addr + ']'); + + if (blockConnect != null && blockConnect.getCount() > 0) { + try { + log.info("ZkTestClientCnxnSocketNIO block connect"); + + blockConnect.await(60, TimeUnit.SECONDS); + + log.info("ZkTestClientCnxnSocketNIO finish block connect"); + } + catch (Exception e) { + log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e); + } + } + + super.connect(addr); + + clients.put(nodeName, this); + } + + /** + * + */ + public void allowConnect() { + assert blockConnectLatch != null && blockConnectLatch.getCount() == 1 : blockConnectLatch; + + log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + ']'); + + blockConnectLatch.countDown(); + } + + /** + * @param blockConnect {@code True} to block client reconnect. + * @throws Exception If failed. + */ + public void closeSocket(boolean blockConnect) throws Exception { + if (blockConnect) + blockConnectLatch = new CountDownLatch(1); + + log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + ", block=" + blockConnect + ']'); + + SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey"); + + k.channel().close(); + } +}