Repository: ignite Updated Branches: refs/heads/ignite-zk 8bd1e077a -> 42bbed0ad
http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java deleted file mode 100644 index 8aac456..0000000 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.curator.test.TestingCluster; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteRunnable; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; - -/** - * - */ -public class ZookeeperClientTest extends GridCommonAbstractTest { - /** */ - private TestingCluster zkCluster; - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - closeZK(); - - super.afterTest(); - } - - /** - * @throws Exception If failed. - */ - public void testDeleteAll() throws Exception { - startZK(1); - - ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null); - - client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); - client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT); - client.createIfNeeded("/apacheIgnite/2", null, CreateMode.PERSISTENT); - - client.deleteAll("/apacheIgnite", Arrays.asList("1", "2"), -1); - - assertTrue(client.getChildren("/apacheIgnite").isEmpty()); - - client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT); - client.deleteAll("/apacheIgnite", Arrays.asList("1"), -1); - - assertTrue(client.getChildren("/apacheIgnite").isEmpty()); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionLoss1() throws Exception { - ZookeeperClient client = new ZookeeperClient(log, "localhost:2200", 3000, null); - - try { - client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); - - fail(); - } - catch (ZookeeperClientFailedException e) { - info("Expected error: " + e); - } - } - - /** - * @throws Exception If failed. - */ - public void testConnectionLoss2() throws Exception { - startZK(1); - - ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - closeZK(); - - try { - client.createIfNeeded("/apacheIgnite2", null, CreateMode.PERSISTENT); - - fail(); - } - catch (ZookeeperClientFailedException e) { - info("Expected error: " + e); - } - } - - /** - * @throws Exception If failed. - */ - public void testConnectionLoss3() throws Exception { - startZK(1); - - CallbackFuture cb = new CallbackFuture(); - - ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, cb); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - closeZK(); - - final AtomicBoolean res = new AtomicBoolean(); - - client.getChildrenAsync("/apacheIgnite1", null, new AsyncCallback.Children2Callback() { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (rc == 0) - res.set(true); - } - }); - - cb.get(10_000); - - assertFalse(res.get()); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionLoss4() throws Exception { - startZK(1); - - CallbackFuture cb = new CallbackFuture(); - - final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, cb); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - final CountDownLatch l = new CountDownLatch(1); - - client.getChildrenAsync("/apacheIgnite1", null, new AsyncCallback.Children2Callback() { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - closeZK(); - - try { - client.createIfNeeded("/apacheIgnite2", null, CreateMode.PERSISTENT); - } - catch (ZookeeperClientFailedException e) { - info("Expected error: " + e); - - l.countDown(); - } - catch (Exception e) { - fail("Unexpected error: " + e); - } - } - }); - - assertTrue(l.await(10, TimeUnit.SECONDS)); - - cb.get(); - } - - /** - * @throws Exception If failed. - */ - public void testReconnect1() throws Exception { - startZK(1); - - ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 30_000, null); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - zkCluster.getServers().get(0).stop(); - - IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - U.sleep(2000); - - info("Restart zookeeper server"); - - zkCluster.getServers().get(0).restart(); - - info("Zookeeper server restarted"); - - return null; - } - }, "start-zk"); - - client.createIfNeeded("/apacheIgnite2", null, CreateMode.PERSISTENT); - - fut.get(); - } - - /** - * @throws Exception If failed. - */ - public void testReconnect1_Callback() throws Exception { - startZK(1); - - ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 30_000, null); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - zkCluster.getServers().get(0).stop(); - - final CountDownLatch l = new CountDownLatch(1); - - client.getChildrenAsync("/apacheIgnite1", null, new AsyncCallback.Children2Callback() { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - info("Callback: " + rc); - - if (rc == 0) - l.countDown(); - } - }); - - IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - U.sleep(2000); - - info("Restart zookeeper server"); - - zkCluster.getServers().get(0).restart(); - - info("Zookeeper server restarted"); - - return null; - } - }, "start-zk"); - - assertTrue(l.await(10, TimeUnit.SECONDS)); - - fut.get(); - } - - /** - * @throws Exception If failed. - */ - public void testReconnect1_InCallback() throws Exception { - startZK(1); - - final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 30_000, null); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - final CountDownLatch l = new CountDownLatch(1); - - client.getChildrenAsync("/apacheIgnite1", null, new AsyncCallback.Children2Callback() { - @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - try { - zkCluster.getServers().get(0).stop(); - - IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - U.sleep(2000); - - info("Restart zookeeper server"); - - zkCluster.getServers().get(0).restart(); - - info("Zookeeper server restarted"); - - return null; - } - }, "start-zk"); - - client.createIfNeeded("/apacheIgnite2", null, CreateMode.PERSISTENT); - - l.countDown(); - - fut.get(); - } - catch (Exception e) { - fail("Unexpected error: " + e); - } - } - }); - - assertTrue(l.await(10, TimeUnit.SECONDS)); - } - - /** - * @throws Exception If failed. - */ - public void testReconnect2() throws Exception { - startZK(1); - - ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 30_000, null); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - zkCluster.getServers().get(0).restart(); - - client.createIfNeeded("/apacheIgnite2", null, CreateMode.PERSISTENT); - } - - /** - * @throws Exception If failed. - */ - public void testReconnect3() throws Exception { - startZK(3); - - ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 30_000, null); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - for (int i = 0; i < 30; i++) { - int idx = rnd.nextInt(3); - - zkCluster.getServers().get(idx).restart(); - - doSleep(rnd.nextLong(100) + 1); - - client.createIfNeeded("/apacheIgnite" + i, null, CreateMode.PERSISTENT); - } - } - - /** - * @throws Exception If failed. - */ - public void testReconnect4() throws Exception { - startZK(3); - - ZookeeperClient client = new ZookeeperClient(log, - zkCluster.getServers().get(2).getInstanceSpec().getConnectString(), - 30_000, - null); - - client.createIfNeeded("/apacheIgnite1", null, CreateMode.PERSISTENT); - - zkCluster.getServers().get(0).stop(); - zkCluster.getServers().get(1).stop(); - - IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - U.sleep(2000); - - info("Restart zookeeper server"); - - zkCluster.getServers().get(0).restart(); - - info("Zookeeper server restarted"); - - return null; - } - }, "start-zk"); - - client.createIfNeeded("/apacheIgnite2", null, CreateMode.PERSISTENT); - - fut.get(); - } - - /** - * @param instances Number of servers in ZK ensemble. - * @throws Exception If failed. - */ - private void startZK(int instances) throws Exception { - assert zkCluster == null; - - zkCluster = new TestingCluster(instances); - - zkCluster.start(); - } - - /** - * - */ - private void closeZK() { - if (zkCluster != null) { - try { - zkCluster.close(); - } - catch (Exception e) { - U.error(log, "Failed to stop Zookeeper client: " + e, e); - } - - zkCluster = null; - } - } - - /** - * - */ - private static class CallbackFuture extends GridFutureAdapter<Void> implements IgniteRunnable { - /** {@inheritDoc} */ - @Override public void run() { - onDone(); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java deleted file mode 100644 index 0df9892..0000000 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ /dev/null @@ -1,994 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.curator.test.TestingCluster; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteCallable; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; -import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient; -import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; -import org.apache.zookeeper.ZooKeeper; - -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD; -import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; - -/** - * - */ -public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { - /** */ - private TestingCluster zkCluster; - - /** */ - private static final boolean USE_TEST_CLUSTER = true; - - /** */ - private boolean client; - - /** */ - private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>(); - - /** */ - private static volatile boolean err; - - /** */ - private boolean testSockNio; - - /** */ - private int sesTimeout; - - /** */ - private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - if (testSockNio) - System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName()); - - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setConsistentId(igniteInstanceName); - - ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); - - zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); - - spis.put(igniteInstanceName, zkSpi); - - if (USE_TEST_CLUSTER) { - assert zkCluster != null; - - zkSpi.setZkConnectionString(zkCluster.getConnectString()); - } - else - zkSpi.setZkConnectionString("localhost:2181"); - - cfg.setDiscoverySpi(zkSpi); - - CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); - - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - - cfg.setCacheConfiguration(ccfg); - - // cfg.setMarshaller(new JdkMarshaller()); - - cfg.setClientMode(client); - - Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); - - lsnrs.put(new IgnitePredicate<Event>() { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - @Override public boolean apply(Event evt) { - try { - DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; - - UUID locId = ignite.cluster().localNode().id(); - - Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId); - - if (nodeEvts == null) { - Object old = evts.put(locId, nodeEvts = new TreeMap<>()); - - assertNull(old); - - synchronized (nodeEvts) { - DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); - - nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); - } - } - - synchronized (nodeEvts) { - DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); - - assertNull(old); - } - } - catch (Throwable e) { - err = true; - - info("Unexpected error: " + e); - } - - return true; - } - }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT}); - - cfg.setLocalEventListeners(lsnrs); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD); - - super.afterTestsStopped(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - if (USE_TEST_CLUSTER) { - zkCluster = new TestingCluster(1); - zkCluster.start(); - } - - reset(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - if (zkCluster != null) { - try { - zkCluster.close(); - } - catch (Exception e) { - U.error(log, "Failed to stop Zookeeper client: " + e, e); - } - - zkCluster = null; - } - - super.afterTest(); - - try { - assertFalse("Unexpected error, see log for details", err); - - checkEventsConsistency(); - } - finally { - reset(); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testClientNodesStatus() throws Exception { - startGrid(0); - - for (Ignite node : G.allGrids()) { - assertEquals(0, node.cluster().forClients().nodes().size()); - assertEquals(1, node.cluster().forServers().nodes().size()); - } - - client = true; - - startGrid(1); - - for (Ignite node : G.allGrids()) { - assertEquals(1, node.cluster().forClients().nodes().size()); - assertEquals(1, node.cluster().forServers().nodes().size()); - } - - client = false; - - startGrid(2); - - client = true; - - startGrid(3); - - for (Ignite node : G.allGrids()) { - assertEquals(2, node.cluster().forClients().nodes().size()); - assertEquals(2, node.cluster().forServers().nodes().size()); - } - - stopGrid(1); - - waitForTopology(3); - - for (Ignite node : G.allGrids()) { - assertEquals(1, node.cluster().forClients().nodes().size()); - assertEquals(2, node.cluster().forServers().nodes().size()); - } - - stopGrid(2); - - waitForTopology(2); - - for (Ignite node : G.allGrids()) { - assertEquals(1, node.cluster().forClients().nodes().size()); - assertEquals(1, node.cluster().forServers().nodes().size()); - } - } - - /** - * @throws Exception If failed. - */ - public void testStopNode_1() throws Exception { - startGrids(5); - - waitForTopology(5); - - stopGrid(3); - - waitForTopology(4); - - startGrid(3); - - waitForTopology(5); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEventsSimple1_SingleNode() throws Exception { - Ignite srv0 = startGrid(0); - - srv0.createCache(new CacheConfiguration<>("c1")); - - waitForEventsAcks(srv0); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEventsSimple1_5_Nodes() throws Exception { - Ignite srv0 = startGrids(5); - - srv0.createCache(new CacheConfiguration<>("c1")); - - awaitPartitionMapExchange(); - - waitForEventsAcks(srv0); - } - - /** - * @throws Exception If failed. - */ - public void testSegmentation1() throws Exception { - sesTimeout = 1000; - testSockNio = true; - - Ignite node0 = startGrid(0); - - final CountDownLatch l = new CountDownLatch(1); - - node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event event) { - l.countDown(); - - return false; - } - }, EventType.EVT_NODE_SEGMENTED); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(true); - - for (int i = 0; i < 10; i++) { - Thread.sleep(1_000); - - if (l.getCount() == 0) - break; - } - - info("Allow connect"); - - c0.allowConnect(); - - assertTrue(l.await(10, TimeUnit.SECONDS)); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore1() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGrid(1); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore2() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGridsMultiThreaded(1, 5); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_NonCoordinator1() throws Exception { - connectionRestore_NonCoordinator(false); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_NonCoordinator2() throws Exception { - connectionRestore_NonCoordinator(true); - } - - /** - * @param failWhenDisconnected {@code True} if fail node while another node is disconnected. - * @throws Exception If failed. - */ - private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - Ignite node1 = startGrid(1); - - ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1); - - c1.closeSocket(true); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - try { - startGrid(2); - } - catch (Exception e) { - info("Start error: " + e); - } - - return null; - } - }, "start-node"); - - checkEvents(node0, joinEvent(3)); - - if (failWhenDisconnected) { - ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2)); - - closeZkClient(spi); - - checkEvents(node0, failEvent(4)); - } - - c1.allowConnect(); - - checkEvents(ignite(1), joinEvent(3)); - - if (failWhenDisconnected) { - checkEvents(ignite(1), failEvent(4)); - - IgnitionEx.stop(getTestIgniteInstanceName(2), true, true); - } - - fut.get(); - - waitForTopology(failWhenDisconnected ? 2 : 3); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator1() throws Exception { - connectionRestore_Coordinator(1, 1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator1_1() throws Exception { - connectionRestore_Coordinator(1, 1, 1); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator2() throws Exception { - connectionRestore_Coordinator(1, 3, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator3() throws Exception { - connectionRestore_Coordinator(3, 3, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator4() throws Exception { - connectionRestore_Coordinator(3, 3, 1); - } - - /** - * @param initNodes Number of initially started nodes. - * @param startNodes Number of nodes to start after coordinator loose connection. - * @param failCnt Number of nodes to stop after coordinator loose connection. - * @throws Exception If failed. - */ - private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception { - sesTimeout = 30_000; - testSockNio = true; - - Ignite node0 = startGrids(initNodes); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(true); - - final AtomicInteger nodeIdx = new AtomicInteger(initNodes); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - try { - startGrid(nodeIdx.getAndIncrement()); - } - catch (Exception e) { - error("Start failed: " + e); - } - - return null; - } - }, startNodes, "start-node"); - - int cnt = 0; - - DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt]; - - int expEvtCnt = 0; - - sesTimeout = 1000; - - List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>(); - - final List<String> failedZkNodes = new ArrayList<>(failCnt); - - for (int i = initNodes; i < initNodes + startNodes; i++) { - ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); - - ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, "impl"); - - impl.waitConnectStart(); - - if (cnt++ < failCnt) { - ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i)); - - c.closeSocket(true); - - blockedC.add(c); - - failedZkNodes.add((String)GridTestUtils.getFieldValue(impl, "locNodeZkPath")); - } - else { - expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1); - - expEvtCnt++; - } - } - - final ZookeeperClient zkClient = new ZookeeperClient(log, zkCluster.getConnectString(), 10_000, null); - - try { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - List<String> c = zkClient.getChildren("/apacheIgnite/default/alive"); - - for (String failedZkNode : failedZkNodes) { - if (c.contains(failedZkNode)) - return false; - } - - return true; - } - catch (Exception e) { - fail(); - - return true; - } - } - }, 10_000)); - } - finally { - zkClient.close(); - } - - c0.allowConnect(); - - for (ZkTestClientCnxnSocketNIO c : blockedC) - c.allowConnect(); - - if (expEvts.length > 0) { - for (int i = 0; i < initNodes; i++) - checkEvents(ignite(i), expEvts); - } - - fut.get(); - - waitForTopology(initNodes + startNodes - failCnt); - } - - /** - * @throws Exception If failed. - */ - public void testClusterRestart() throws Exception { - startGridsMultiThreaded(3, false); - - stopAllGrids(); - - evts.clear(); - - startGridsMultiThreaded(3, false); - - waitForTopology(3); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore4() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGrid(1); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_1_Node() throws Exception { - startGrid(0); - - waitForTopology(1); - - stopGrid(0); - } - - /** - * @throws Exception If failed. - */ - public void testRestarts_2_Nodes() throws Exception { - startGrid(0); - - for (int i = 0; i < 10; i++) { - info("Iteration: " + i); - - startGrid(1); - - waitForTopology(2); - - stopGrid(1); - } - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_2_Nodes_WithCache() throws Exception { - startGrids(2); - - for (Ignite node : G.allGrids()) { - IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); - - assertNotNull(cache); - - for (int i = 0; i < 100; i++) { - cache.put(i, node.name()); - - assertEquals(node.name(), cache.get(i)); - } - } - - awaitPartitionMapExchange(); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_2_Nodes() throws Exception { - startGrid(0); - - waitForTopology(1); - - startGrid(1); - - waitForTopology(2); - - for (Ignite node : G.allGrids()) - node.compute().broadcast(new DummyCallable(null)); - - awaitPartitionMapExchange(); - - waitForEventsAcks(ignite(0)); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop1() throws Exception { - startGridsMultiThreaded(5, false); - - waitForTopology(5); - - awaitPartitionMapExchange(); - - waitForEventsAcks(ignite(0)); - - stopGrid(0); - - waitForTopology(4); - - for (Ignite node : G.allGrids()) - node.compute().broadcast(new DummyCallable(null)); - - startGrid(0); - - waitForTopology(5); - - awaitPartitionMapExchange(); - - waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes()))); - } - - /** - * @param node Node. - * @throws Exception If failed. - */ - private void waitForEventsAcks(final Ignite node) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(), - "impl", "evtsData", "evts"); - - if (!evts.isEmpty()) { - info("Unacked events: " + evts); - - return false; - } - - return true; - } - }, 10_000)); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop2() throws Exception { - startGridsMultiThreaded(10, false); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer idx) { - stopGrid(idx); - } - }, 3, "stop-node-thread"); - - waitForTopology(7); - - startGridsMultiThreaded(0, 3); - - waitForTopology(10); - } - - /** - * @throws Exception If failed. - */ - public void testStartStopWithClients() throws Exception { - final int SRVS = 3; - - startGrids(SRVS); - - client = true; - - final int THREADS = 30; - - for (int i = 0; i < 5; i++) { - info("Iteration: " + i); - - startGridsMultiThreaded(SRVS, THREADS); - - waitForTopology(SRVS + THREADS); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer idx) { - stopGrid(idx + SRVS); - } - }, THREADS, "stop-node"); - - waitForTopology(SRVS); - - checkEventsConsistency(); - } - } - - /** - * - */ - private void reset() { - System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); - - ZkTestClientCnxnSocketNIO.reset(); - - System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); - - err = false; - - evts.clear(); - } - - /** - * @throws Exception If failed. - */ - private void checkEventsConsistency() throws Exception { - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { - UUID nodeId = nodeEvtEntry.getKey(); - Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); - - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { - if (!nodeId.equals(nodeEvtEntry0.getKey())) { - Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); - - synchronized (nodeEvts) { - synchronized (nodeEvts0) { - checkEventsConsistency(nodeEvts, nodeEvts0); - } - } - } - } - } - } - - /** - * @param evts1 Received events. - * @param evts2 Received events. - */ - private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) { - for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) { - DiscoveryEvent evt1 = e1.getValue(); - DiscoveryEvent evt2 = evts2.get(e1.getKey()); - - if (evt2 != null) { - assertEquals(evt1.topologyVersion(), evt2.topologyVersion()); - assertEquals(evt1.eventNode(), evt2.eventNode()); - assertEquals(evt1.topologyNodes(), evt2.topologyNodes()); - } - } - } - - /** - * @param nodeName Node name. - * @return Node's discovery SPI. - * @throws Exception If failed. - */ - private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception { - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return spis.contains(nodeName); - } - }, 5000); - - ZookeeperDiscoverySpi spi = spis.get(nodeName); - - assertNotNull("Failed to get SPI for node: " + nodeName, spi); - - return spi; - } - - private static DiscoveryEvent joinEvent(long topVer) { - DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); - - expEvt.topologySnapshot(topVer, null); - - return expEvt; - } - - private static DiscoveryEvent failEvent(long topVer) { - DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); - - expEvt.topologySnapshot(topVer, null); - - return expEvt; - } - - /** - * @param node Node. - * @param expEvts Expected events. - * @throws Exception If fialed. - */ - private void checkEvents(final Ignite node, final DiscoveryEvent...expEvts) throws Exception { - checkEvents(node.cluster().localNode().id(), expEvts); - } - - /** - * @param nodeId Node ID. - * @param expEvts Expected events. - * @throws Exception If failed. - */ - private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); - - if (nodeEvts == null) { - info("No events for node: " + nodeId); - - return false; - } - - synchronized (nodeEvts) { - for (DiscoveryEvent expEvt : expEvts) { - DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion()); - - if (evt0 == null) { - info("No event for version: " + expEvt.topologyVersion()); - - return false; - } - - assertEquals(expEvt.type(), evt0.type()); - } - } - - return true; - } - }, 10000)); - } - - /** - * @param spi Spi instance. - */ - private void closeZkClient(ZookeeperDiscoverySpi spi) { - ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", "zk"); - - try { - zk.close(); - } - catch (Exception e) { - fail("Unexpected error: " + e); - } - } - - /** - * @param expSize Expected nodes number. - * @throws Exception If failed. - */ - private void waitForTopology(final int expSize) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - List<Ignite> nodes = G.allGrids(); - - if (nodes.size() != expSize) { - info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']'); - - return false; - } - - for (Ignite node: nodes) { - int sizeOnNode = node.cluster().nodes().size(); - - if (sizeOnNode != expSize) { - info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); - - return false; - } - } - - return true; - } - }, 5000)); - } - - /** - * - */ - private static class DummyCallable implements IgniteCallable<Object> { - /** */ - private byte[] data; - - /** - * @param data Data. - */ - DummyCallable(byte[] data) { - this.data = data; - } - - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - return data; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java deleted file mode 100644 index c8886af..0000000 --- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.logger.java.JavaLogger; -import org.apache.ignite.testframework.GridTestUtils; - -/** - * - */ -public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { - /** */ - public static final IgniteLogger log = new JavaLogger().getLogger(ZkTestClientCnxnSocketNIO.class); - - /** */ - public volatile CountDownLatch blockConnectLatch; - - /** */ - public static ConcurrentHashMap<String, ZkTestClientCnxnSocketNIO> clients = new ConcurrentHashMap<>(); - - /** */ - private final String nodeName; - - /** - * - */ - public static void reset() { - clients.clear(); - } - - /** - * @param node Node. - * @return ZK client. - */ - public static ZkTestClientCnxnSocketNIO forNode(Ignite node) { - return clients.get(node.name()); - } - - /** - * @param instanceName Ignite instance name. - * @return ZK client. - */ - public static ZkTestClientCnxnSocketNIO forNode(String instanceName) { - return clients.get(instanceName); - } - - /** - * @throws IOException If failed. - */ - public ZkTestClientCnxnSocketNIO() throws IOException { - super(); - - String threadName = Thread.currentThread().getName(); - - nodeName = threadName.substring(threadName.indexOf('-') + 1); - - log.info("ZkTestClientCnxnSocketNIO created for node: " + nodeName); - } - - /** {@inheritDoc} */ - @Override void connect(InetSocketAddress addr) throws IOException { - CountDownLatch blockConnect = this.blockConnectLatch; - - log.info("ZkTestClientCnxnSocketNIO connect [node=" + nodeName + ", addr=" + addr + ']'); - - if (blockConnect != null && blockConnect.getCount() > 0) { - try { - log.info("ZkTestClientCnxnSocketNIO block connect"); - - blockConnect.await(); - - log.info("ZkTestClientCnxnSocketNIO finish block connect"); - } - catch (Exception e) { - log.error("Error in ZkTestClientCnxnSocketNIO: " + e, e); - } - } - - super.connect(addr); - - clients.put(nodeName, this); - } - - /** - * - */ - public void allowConnect() { - assert blockConnectLatch != null && blockConnectLatch.getCount() == 1; - - log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + ']'); - - blockConnectLatch.countDown(); - } - - /** - * @param blockConnect {@code True} to block client reconnect. - * @throws Exception If failed. - */ - public void closeSocket(boolean blockConnect) throws Exception { - if (blockConnect) - blockConnectLatch = new CountDownLatch(1); - - log.info("ZkTestClientCnxnSocketNIO closeSocket [node=" + nodeName + ", block=" + blockConnect + ']'); - - SelectionKey k = GridTestUtils.getFieldValue(this, ClientCnxnSocketNIO.class, "sockKey"); - - k.channel().close(); - } -}