Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 8c07bc892 -> 44a81bbb6


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f89dbcf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f89dbcf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f89dbcf

Branch: refs/heads/ignite-zk
Commit: 1f89dbcfab77206f8ae0c373012edf04c9503a26
Parents: 5674f7f
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Jan 12 12:53:02 2018 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Jan 12 12:59:42 2018 +0300

----------------------------------------------------------------------
 .../zookeeper/ZkTestClientCnxnSocketNIO.java    | 137 -------------------
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  18 ++-
 .../zk/ZookeeperDiscoverySpiTestSuite2.java     |   1 +
 .../zk/internal/ZookeeperDiscoverySpiTest.java  |  75 ++++------
 .../zookeeper/ZkTestClientCnxnSocketNIO.java    | 137 +++++++++++++++++++
 5 files changed, 177 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
 
b/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
deleted file mode 100644
index 7892b5e..0000000
--- 
a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.logger.java.JavaLogger;
-import org.apache.ignite.testframework.GridTestUtils;
-
-/**
- *
- */
-public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO {
-    /** */
-    public static final IgniteLogger log = new 
JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class);
-
-    /** */
-    public static volatile boolean DEBUG = false;
-
-    /** */
-    public volatile CountDownLatch blockConnectLatch;
-
-    /** */
-    public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients 
= new ConcurrentHashMap<>();
-
-    /** */
-    private final String nodeName;
-
-    /**
-     *
-     */
-    public static void reset() {
-        clients.clear();
-    }
-
-    /**
-     * @param node Node.
-     * @return ZK client.
-     */
-    public static ZkTestClientCnxnSocketNIO forNode(Ignite node) {
-        return clients.get(node.name());
-    }
-
-    /**
-     * @param instanceName Ignite instance name.
-     * @return ZK client.
-     */
-    public static ZkTestClientCnxnSocketNIO forNode(String instanceName) {
-        return clients.get(instanceName);
-    }
-
-    /**
-     * @throws IOException If failed.
-     */
-    public ZkTestClientCnxnSocketNIO() throws IOException {
-        super();
-
-        String threadName = Thread.currentThread().getName();
-
-        nodeName = threadName.substring(threadName.indexOf('-') + 1);
-
-        if (DEBUG)
-            log.info("ZkTestClientCnxnSocketNIO created for node: " + 
nodeName);
-    }
-
-    /** {@inheritDoc} */
-    @Override void connect(InetSocketAddress addr) throws IOException {
-        CountDownLatch blockConnect = this.blockConnectLatch;
-
-        if (DEBUG)
-            log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + 
", addr=" + addr + ']');
-
-        if (blockConnect != null && blockConnect.getCount() > 0) {
-            try {
-                log.info("ZkTestClientCnxnSocketNIO block connect");
-
-                blockConnect.await(60, TimeUnit.SECONDS);
-
-                log.info("ZkTestClientCnxnSocketNIO finish block connect");
-            }
-            catch (Exception e) {
-                log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e);
-            }
-        }
-
-        super.connect(addr);
-
-        clients.put(nodeName, this);
-    }
-
-    /**
-     *
-     */
-    public void allowConnect() {
-        assert blockConnectLatch != null && blockConnectLatch.getCount() == 1 
: blockConnectLatch;
-
-        log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + 
']');
-
-        blockConnectLatch.countDown();
-    }
-
-    /**
-     * @param blockConnect {@code True} to block client reconnect.
-     * @throws Exception If failed.
-     */
-    public void closeSocket(boolean blockConnect) throws Exception {
-        if (blockConnect)
-            blockConnectLatch = new CountDownLatch(1);
-
-        log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + 
", block=" + blockConnect + ']');
-
-        SelectionKey k = GridTestUtils.getFieldValue(this, 
ClientCnxnSocketNIO.class, "sockKey");
-
-        k.channel().close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 8d404f6..361661b 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -188,6 +188,9 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private final AtomicReference<ZkCommunicationErrorProcessFuture> 
commErrProcFut = new AtomicReference<>();
 
+    /** */
+    private long prevSavedEvtsTopVer;
+
     /**
      * @param spi Discovery SPI.
      * @param igniteInstanceName Instance name.
@@ -2058,8 +2061,19 @@ public class ZookeeperDiscoveryImpl {
 
         long time = System.currentTimeMillis() - start;
 
-        if (log.isInfoEnabled()) {
-            log.info("Discovery coordinator saved new topology events 
[topVer=" + rtState.evtsData.topVer +
+        if (prevSavedEvtsTopVer != rtState.evtsData.topVer) {
+            if (log.isInfoEnabled()) {
+                log.info("Discovery coordinator saved new topology events 
[topVer=" + rtState.evtsData.topVer +
+                    ", size=" + evtsBytes.length +
+                    ", evts=" + rtState.evtsData.evts.size() +
+                    ", lastEvt=" + rtState.evtsData.evtIdGen +
+                    ", saveTime=" + time + ']');
+            }
+
+            prevSavedEvtsTopVer = rtState.evtsData.topVer;
+        }
+        else if (log.isDebugEnabled()) {
+            log.debug("Discovery coordinator saved new topology events 
[topVer=" + rtState.evtsData.topVer +
                 ", size=" + evtsBytes.length +
                 ", evts=" + rtState.evtsData.evts.size() +
                 ", lastEvt=" + rtState.evtsData.evtIdGen +

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
index d5749af..f5f395b 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite2.java
@@ -67,6 +67,7 @@ public class ZookeeperDiscoverySpiTestSuite2 extends 
TestSuite {
 
         return suite;
     }
+
     /**
      * Called via reflection by {@link 
org.apache.ignite.testframework.junits.GridAbstractTest}.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index d18db5a..05b1d7a 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -109,6 +109,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestSuite2;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.zookeeper.KeeperException;
@@ -145,7 +146,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /** */
     private static TestingCluster zkCluster;
 
-    /** */
+    /** To run test with real local ZK. */
     private static final boolean USE_TEST_CLUSTER = true;
 
     /** */
@@ -359,50 +360,6 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         
System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT,
 "1000");
     }
 
-    /**
-     * @param instances Number of instances.
-     * @return Cluster.
-     */
-    private static TestingCluster createTestingCluster(int instances) {
-        String tmpDir = System.getProperty("java.io.tmpdir");
-
-        List<InstanceSpec> specs = new ArrayList<>();
-
-        for (int i = 0; i < instances; i++) {
-            File file = new File(tmpDir, "apacheIgniteTestZk-" + i);
-
-            if (file.isDirectory())
-                deleteRecursively0(file);
-            else {
-                if (!file.mkdirs())
-                    throw new IgniteException("Failed to create directory for 
test Zookeeper server: " + file.getAbsolutePath());
-            }
-
-            specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, 
10_000));
-        }
-
-        return new TestingCluster(specs);
-    }
-
-    /**
-     * @param file Directory to delete.
-     */
-    private static void deleteRecursively0(File file) {
-        File[] files = file.listFiles();
-
-        if (files == null)
-            return;
-
-        for (File f : files) {
-            if (f.isDirectory())
-                deleteRecursively0(f);
-            else {
-                if (!f.delete())
-                    throw new IgniteException("Failed to delete file: " + 
f.getAbsolutePath());
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         stopZkCluster();
@@ -447,7 +404,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         super.beforeTest();
 
         if (USE_TEST_CLUSTER && zkCluster == null) {
-            zkCluster = createTestingCluster(ZK_SRVS);
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
 
             zkCluster.start();
         }
@@ -1070,7 +1027,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             assertTrue(l.await(10, TimeUnit.SECONDS));
         }
         finally {
-            zkCluster = createTestingCluster(ZK_SRVS);
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
 
             zkCluster.start();
         }
@@ -1107,7 +1064,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         finally {
             zkCluster.close();
 
-            zkCluster = createTestingCluster(ZK_SRVS);
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
 
             zkCluster.start();
         }
@@ -1142,7 +1099,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         finally {
             zkCluster.close();
 
-            zkCluster = createTestingCluster(ZK_SRVS);
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
 
             zkCluster.start();
         }
@@ -1866,14 +1823,28 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testTopologyChangeMultithreaded_RestartZk() throws Exception {
-        topologyChangeWithRestarts(true, false);
+        try {
+            topologyChangeWithRestarts(true, false);
+        }
+        finally {
+            zkCluster.stop();
+
+            zkCluster = null;
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTopologyChangeMultithreaded_RestartZk_CloseClients() 
throws Exception {
-        topologyChangeWithRestarts(true, true);
+        try {
+            topologyChangeWithRestarts(true, true);
+        }
+        finally {
+            zkCluster.stop();
+
+            zkCluster = null;
+        }
     }
 
     /**
@@ -3619,7 +3590,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
 
         sesTimeout = 30_000;
 
-        zkCluster = createTestingCluster(3);
+        zkCluster = ZookeeperDiscoverySpiTestSuite2.createTestingCluster(3);
 
         try {
             final AtomicInteger idx = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f89dbcf/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
new file mode 100644
index 0000000..7892b5e
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ *
+ */
+public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+    /** */
+    public static final IgniteLogger log = new 
JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class);
+
+    /** */
+    public static volatile boolean DEBUG = false;
+
+    /** */
+    public volatile CountDownLatch blockConnectLatch;
+
+    /** */
+    public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients 
= new ConcurrentHashMap<>();
+
+    /** */
+    private final String nodeName;
+
+    /**
+     *
+     */
+    public static void reset() {
+        clients.clear();
+    }
+
+    /**
+     * @param node Node.
+     * @return ZK client.
+     */
+    public static ZkTestClientCnxnSocketNIO forNode(Ignite node) {
+        return clients.get(node.name());
+    }
+
+    /**
+     * @param instanceName Ignite instance name.
+     * @return ZK client.
+     */
+    public static ZkTestClientCnxnSocketNIO forNode(String instanceName) {
+        return clients.get(instanceName);
+    }
+
+    /**
+     * @throws IOException If failed.
+     */
+    public ZkTestClientCnxnSocketNIO() throws IOException {
+        super();
+
+        String threadName = Thread.currentThread().getName();
+
+        nodeName = threadName.substring(threadName.indexOf('-') + 1);
+
+        if (DEBUG)
+            log.info("ZkTestClientCnxnSocketNIO created for node: " + 
nodeName);
+    }
+
+    /** {@inheritDoc} */
+    @Override void connect(InetSocketAddress addr) throws IOException {
+        CountDownLatch blockConnect = this.blockConnectLatch;
+
+        if (DEBUG)
+            log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + 
", addr=" + addr + ']');
+
+        if (blockConnect != null && blockConnect.getCount() > 0) {
+            try {
+                log.info("ZkTestClientCnxnSocketNIO block connect");
+
+                blockConnect.await(60, TimeUnit.SECONDS);
+
+                log.info("ZkTestClientCnxnSocketNIO finish block connect");
+            }
+            catch (Exception e) {
+                log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e);
+            }
+        }
+
+        super.connect(addr);
+
+        clients.put(nodeName, this);
+    }
+
+    /**
+     *
+     */
+    public void allowConnect() {
+        assert blockConnectLatch != null && blockConnectLatch.getCount() == 1 
: blockConnectLatch;
+
+        log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + 
']');
+
+        blockConnectLatch.countDown();
+    }
+
+    /**
+     * @param blockConnect {@code True} to block client reconnect.
+     * @throws Exception If failed.
+     */
+    public void closeSocket(boolean blockConnect) throws Exception {
+        if (blockConnect)
+            blockConnectLatch = new CountDownLatch(1);
+
+        log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + 
", block=" + blockConnect + ']');
+
+        SelectionKey k = GridTestUtils.getFieldValue(this, 
ClientCnxnSocketNIO.class, "sockKey");
+
+        k.channel().close();
+    }
+}

Reply via email to