Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 807ceb380 -> 669ab1371
IGNITE-901 Added tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/669ab137 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/669ab137 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/669ab137 Branch: refs/heads/ignite-901 Commit: 669ab1371f5dc211cb72ff596b7dc59a49f3dd9d Parents: 807ceb3 Author: nikolay_tikhonov <[email protected]> Authored: Fri Jul 3 13:29:11 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Jul 3 13:29:11 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientReconnectAbstractTest.java | 81 +++ .../IgniteClientReconnectApiBlockTest.java | 533 +++++++++++++++-- .../IgniteClientReconnectAtomicsTest.java | 579 ++++++++++++++++++- .../IgniteClientReconnectFailoverSelfTest.java | 290 ++++++++++ .../IgniteClientReconnectTestSuite.java | 1 + 5 files changed, 1435 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 23b8a15..937104f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -18,19 +18,26 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.internal.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; import org.apache.ignite.testframework.junits.common.*; +import org.eclipse.jetty.util.*; import java.io.*; import java.net.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * @@ -53,6 +60,10 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra cfg.setDiscoverySpi(disco); + BlockTpcCommunicationSpi commSpi = new BlockTpcCommunicationSpi(log); + + cfg.setCommunicationSpi(commSpi); + if (clientMode) cfg.setClientMode(true); @@ -79,6 +90,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); } + /** + * @param ignite Node. + * @return Communication SPI. + */ + protected BlockTpcCommunicationSpi commSpi(Ignite ignite) { + return ((BlockTpcCommunicationSpi)ignite.configuration().getCommunicationSpi()); + } + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -157,4 +176,66 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra super.writeToSocket(sock, msg); } } + + /** + * + */ + protected static class BlockTpcCommunicationSpi extends TcpCommunicationSpi { + /** */ + volatile Class msgClass; + + AtomicBoolean collectStart = new AtomicBoolean(false); + + ConcurrentHashSet<String> classes = new ConcurrentHashSet<>(); + + /** */ + protected IgniteLogger log; + + /** + * @param log Logger. + */ + public BlockTpcCommunicationSpi(IgniteLogger log) { + this.log = log; + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + Class msgClass0 = msgClass; + + if (collectStart.get() && msg instanceof GridIoMessage) + classes.add(((GridIoMessage)msg).message().getClass().getName()); + + if (msgClass0 != null && msg instanceof GridIoMessage + && ((GridIoMessage)msg).message().getClass().equals(msgClass)) { + log.info("Block message: " + msg); + + return; + } + + super.sendMessage(node, msg); + } + + /** + * @param clazz Class of messages which will be block. + */ + public void blockMsg(Class clazz) { + msgClass = clazz; + } + + /** + * Unlock all message. + */ + public void unblockMsg() { + msgClass = null; + } + + public void start() { + collectStart.set(true); + } + + public void print() { + for (String aClass : classes) + log.error(aClass); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java index 164f6c8..f9522a0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiBlockTest.java @@ -18,12 +18,16 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; import org.apache.ignite.testframework.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; @@ -50,12 +54,453 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst /** * @throws Exception If failed. */ - @SuppressWarnings("unchecked") public void testIgniteBlockOnDisconnect() throws Exception { + // Check cache operations. + cacheOperationsTest(); + + // Check cache operations. + beforeTestsStarted(); + dataStructureOperationsTest(); + + // Check ignite operations. + beforeTestsStarted(); + igniteOperationsTest(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void dataStructureOperationsTest() throws Exception { clientMode = true; final Ignite client = startGrid(serverCount()); + doTestIgniteOperationOnDisconnect(client, Arrays.asList( + // Check atomic long. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.atomicLong("testAtomic", 41, true); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteAtomicLong atomicLong = (IgniteAtomicLong)o; + + assertEquals(42, atomicLong.incrementAndGet()); + + return true; + } + } + ), + // Check set. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.set("testSet", new CollectionConfiguration()); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteSet set = (IgniteSet)o; + + String val = "testVal"; + + set.add(val); + + assertEquals(1, set.size()); + assertTrue(set.contains(val)); + + return true; + } + } + ), + // Check ignite queue. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.queue("TestQueue", 10, new CollectionConfiguration()); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteQueue queue = (IgniteQueue)o; + + String val = "Test"; + + queue.add(val); + + assertEquals(val, queue.poll()); + + return true; + } + } + ) + )); + + clientMode = false; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void cacheOperationsTest() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + final IgniteCache<Object, Object> defaultCache = client.cache(null); + + assertNotNull(defaultCache); + + doTestIgniteOperationOnDisconnect(client, Arrays.asList( + // Check put and get operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return defaultCache.getAndPut(9999, 9999); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNull(o); + + assertEquals(9999, defaultCache.get(9999)); + + return true; + } + } + ), + // Check put operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + defaultCache.put(10000, 10000); + + return true; + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertTrue((Boolean)o); + + assertEquals(10000, defaultCache.get(10000)); + + return true; + } + } + ), + // Check get operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return defaultCache.get(10001); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNull(o); + + return true; + } + } + ), + // Check invoke operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return defaultCache.invoke(10000, new CacheEntryProcessor<Object, Object, Object>() { + @Override public Object process(MutableEntry<Object, Object> entry, + Object... arguments) throws EntryProcessorException { + assertTrue(entry.exists()); + + return (int)entry.getValue() * 2; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + assertEquals(20000, (int)o); + + return true; + } + } + ), + // Check put async operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + IgniteCache<Object, Object> async = defaultCache.withAsync(); + + async.put(10002, 10002); + + return async.future().get(); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNull(o); + + assertEquals(10002, defaultCache.get(10002)); + + return true; + } + } + ), + // Check transaction. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.transactions(); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteTransactions txs = (IgniteTransactions)o; + + assertNotNull(txs); + + return true; + } + } + ), + // Check get cache. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.cache(null); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)o; + + assertNotNull(cache0); + + cache0.put(1, 1); + + assertEquals(1, cache0.get(1)); + + return true; + } + } + ), + // Check streamer. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.dataStreamer(null); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)o; + + streamer.addData(2, 2); + + streamer.close(); + + assertEquals(2, client.cache(null).get(2)); + + return true; + } + } + ), + // Check create cache. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.createCache("test_cache"); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteCache<Object, Object> cache = (IgniteCache<Object, Object>)o; + + assertNotNull(cache); + + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + return true; + } + } + ) + + )); + + clientMode = false; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void igniteOperationsTest() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + final IgniteCache<Object, Object> defaultCache = client.cache(null); + + final CountDownLatch recvLatch = new CountDownLatch(1); + + assertNotNull(defaultCache); + + doTestIgniteOperationOnDisconnect(client, Arrays.asList( + // Check compute. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.compute(); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + IgniteCompute comp = (IgniteCompute)o; + + Collection<UUID> uuids = comp.broadcast(new IgniteCallable<UUID>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public UUID call() throws Exception { + return ignite.cluster().localNode().id(); + } + }); + + for (UUID uuid : uuids) + assertNotNull(uuid); + + return true; + } + } + ), + + // Check ping node. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.cluster().pingNode(new UUID(0, 0)); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + Boolean pingNode = (Boolean)o; + + assertFalse(pingNode); + + return true; + } + } + ), + // Check register remote listener. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.events().remoteListen(null, new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + return true; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + UUID remoteId = (UUID)o; + + assertNotNull(remoteId); + + client.events().stopRemoteListen(remoteId); + + return true; + } + } + ), + // Check message operation. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.message().remoteListen(null, new IgniteBiPredicate<UUID, Object>() { + @Override public boolean apply(UUID uuid, Object o) { + if (o.equals("Test message.")) + recvLatch.countDown(); + + return true; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + IgniteMessaging msg = client.message(); + + msg.send(null, "Test message."); + + try { + assert recvLatch.await(2, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + fail("Message wasn't received."); + } + + return true; + } + } + ), + // Check executor. + new T2<Callable, C1<Object, Boolean>>( + new Callable() { + @Override public Object call() throws Exception { + return client.executorService().submit(new Callable<Integer>() { + @Override public Integer call() throws Exception { + return 42; + } + }); + } + }, + new C1<Object, Boolean>() { + @Override public Boolean apply(Object o) { + assertNotNull(o); + + Future<Integer> fut = (Future<Integer>)o; + + try { + assertEquals(42, (int)fut.get()); + } + catch (Exception e) { + fail("Failed submit task."); + } + + return true; + } + } + ) + )); + + clientMode = false; + } + + /** + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void doTestIgniteOperationOnDisconnect(Ignite client, final List<T2<Callable, C1<Object, Boolean>>> ops) + throws Exception { assertNotNull(client.cache(null)); final TestTcpDiscoverySpi clientSpi = spi(client); @@ -74,8 +519,6 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst final List<IgniteInternalFuture> futs = new ArrayList<>(); - // TODO IGNITE-901 test block for others public API. - client.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { @@ -83,26 +526,12 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst assertEquals(1, reconnectLatch.getCount()); - futs.add(GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - return client.transactions(); - } - })); - - futs.add(GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - return client.cache(null); - } - })); - - futs.add(GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - return client.dataStreamer(null); - } - })); + for (T2<Callable, C1<Object, Boolean>> op : ops) + futs.add(GridTestUtils.runAsync(op.get1())); disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); reconnectLatch.countDown(); @@ -112,46 +541,56 @@ public class IgniteClientReconnectApiBlockTest extends IgniteClientReconnectAbst } }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - log.info("Fail client."); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + try { + log.info("Fail client."); - assertEquals(3, futs.size()); + srvSpi.failNode(client.cluster().localNode().id(), null); - for (IgniteInternalFuture<?> fut : futs) - assertNotDone(fut); + assertTrue(disconnectLatch.await(5000, TimeUnit.MILLISECONDS)); - U.sleep(2000); + assertEquals(ops.size(), futs.size()); - for (IgniteInternalFuture<?> fut : futs) - assertNotDone(fut); + for (IgniteInternalFuture<?> fut : futs) + assertNotDone(fut); - log.info("Allow reconnect."); + U.sleep(2000); - clientSpi.writeLatch.countDown(); + for (IgniteInternalFuture<?> fut : futs) + assertNotDone(fut); - assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); + log.info("Allow reconnect."); - IgniteTransactions txs = (IgniteTransactions)futs.get(0).get(); + clientSpi.writeLatch.countDown(); - assertNotNull(txs); + assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS)); - IgniteCache<Object, Object> cache0 = (IgniteCache<Object, Object>)futs.get(1).get(); + // Check operation after reconnect working. + for (int i = 0; i < futs.size(); i++) { + final int i0 = i; - assertNotNull(cache0); + try { + final Object furRes = futs.get(i0).get(2, TimeUnit.SECONDS); - cache0.put(1, 1); - - assertEquals(1, cache0.get(1)); - - IgniteDataStreamer<Object, Object> streamer = (IgniteDataStreamer<Object, Object>)futs.get(2).get(); + assertTrue(GridTestUtils.runAsync(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return ops.get(i0).get2().apply(furRes); + } + }).get(2, TimeUnit.SECONDS)); + } + catch (IgniteFutureTimeoutCheckedException e) { + e.printStackTrace(); - streamer.addData(2, 2); + fail("Operation timeout. Iteration: " + i + "."); + } + } + } + finally { + clientSpi.writeLatch.countDown(); - streamer.close(); + for (IgniteInternalFuture fut : futs) + fut.cancel(); - assertEquals(2, cache0.get(2)); + stopAllGrids(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index bbb7eef..1a5b795 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; @@ -44,6 +46,494 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr /** * @throws Exception If failed. */ + public void testAtomicReferenceReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true); + + assertEquals("1st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", clientAtomicRef.get()); + + IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRef", "1st value", false); + + assertEquals("2st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", srvAtomicRef.get()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + assertEquals("3st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("3st value", "4st value")); + assertEquals("4st value", srvAtomicRef.get()); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertEquals("4st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("4st value", "5st value")); + assertEquals("5st value", clientAtomicRef.get()); + + assertEquals("5st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("5st value", "6st value")); + assertEquals("6st value", srvAtomicRef.get()); + + srvAtomicRef.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReferenceReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteAtomicReference<String> clientAtomicRef = + client.atomicReference("atomicRefRemoved", "1st value", true); + + assertEquals("1st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", clientAtomicRef.get()); + + IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefRemoved", "1st value", false); + + assertEquals("2st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", srvAtomicRef.get()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + srvAtomicRef.close(); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientAtomicRef.compareAndSet("3st value", "4st value"); + + return null; + } + }, IgniteException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReferenceReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteAtomicReference<String> clientAtomicRef = + client.atomicReference("atomicRefInProg", "1st value", true); + + assertEquals("1st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("1st value", "2st value")); + assertEquals("2st value", clientAtomicRef.get()); + + IgniteAtomicReference<String> srvAtomicRef = srv.atomicReference("atomicRefInProg", "1st value", false); + + assertEquals("2st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); + assertEquals("3st value", srvAtomicRef.get()); + + BlockTpcCommunicationSpi servCommSpi = commSpi(srv); + + servCommSpi.blockMsg(GridNearLockResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return clientAtomicRef.compareAndSet("3st value", "4st value"); + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + servCommSpi.unblockMsg(); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + // Check that after reconnect working. + assertEquals("3st value", clientAtomicRef.get()); + assertTrue(clientAtomicRef.compareAndSet("3st value", "4st value")); + assertEquals("4st value", clientAtomicRef.get()); + + assertEquals("4st value", srvAtomicRef.get()); + assertTrue(srvAtomicRef.compareAndSet("4st value", "5st value")); + assertEquals("5st value", srvAtomicRef.get()); + + srvAtomicRef.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicStampedReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true); + + assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, clientAtomicStamped.value()); + assertEquals(1, clientAtomicStamped.stamp()); + + IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStamped", 0, 0, false); + + assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, srvAtomicStamped.value()); + assertEquals(2, srvAtomicStamped.stamp()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + assertEquals(true, srvAtomicStamped.compareAndSet(2, 3, 2, 3)); + assertEquals(3, srvAtomicStamped.value()); + assertEquals(3, srvAtomicStamped.stamp()); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertEquals(true, clientAtomicStamped.compareAndSet(3, 4, 3, 4)); + assertEquals(4, clientAtomicStamped.value()); + assertEquals(4, clientAtomicStamped.stamp()); + + assertEquals(true, srvAtomicStamped.compareAndSet(4, 5, 4, 5)); + assertEquals(5, srvAtomicStamped.value()); + assertEquals(5, srvAtomicStamped.stamp()); + + srvAtomicStamped.close(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicStampedReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true); + + assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, clientAtomicStamped.value()); + assertEquals(1, clientAtomicStamped.stamp()); + + IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedRemoved", 0, 0, false); + + assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, srvAtomicStamped.value()); + assertEquals(2, srvAtomicStamped.stamp()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + srvAtomicStamped.close(); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientAtomicStamped.compareAndSet(2, 3, 2, 3); + + return null; + } + }, IgniteException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicStampedReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true); + + assertEquals(true, clientAtomicStamped.compareAndSet(0, 1, 0, 1)); + assertEquals(1, clientAtomicStamped.value()); + assertEquals(1, clientAtomicStamped.stamp()); + + IgniteAtomicStamped srvAtomicStamped = srv.atomicStamped("atomicStampedInProgress", 0, 0, false); + + assertEquals(true, srvAtomicStamped.compareAndSet(1, 2, 1, 2)); + assertEquals(2, srvAtomicStamped.value()); + assertEquals(2, srvAtomicStamped.stamp()); + + BlockTpcCommunicationSpi servCommSpi = commSpi(srv); + + servCommSpi.blockMsg(GridNearLockResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return clientAtomicStamped.compareAndSet(2, 3, 2, 3); + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + servCommSpi.unblockMsg(); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + // Check that after reconnect working. + assertEquals(true, clientAtomicStamped.compareAndSet(2, 3, 2, 3)); + assertEquals(3, clientAtomicStamped.value()); + assertEquals(3, clientAtomicStamped.stamp()); + + assertEquals(true, srvAtomicStamped.compareAndSet(3, 4, 3, 4)); + assertEquals(4, srvAtomicStamped.value()); + assertEquals(4, srvAtomicStamped.stamp()); + + srvAtomicStamped.close(); + } + + /** + * @throws Exception If failed. + */ public void testAtomicLongReconnect() throws Exception { Ignite client = grid(serverCount()); @@ -141,7 +631,8 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr info("Disconnected: " + evt); disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); reconnectLatch.countDown(); @@ -175,7 +666,91 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr /** * @throws Exception If failed. */ - public void testLatchReconnect1() throws Exception { + public void testAtomicLongReconnectInProgress() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true); + + final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false); + + commSpi.msgClass = GridNearLockResponse.class; + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return clientAtomicLong.getAndAdd(1); + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMsg(); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + // Check that future failed. + assertNotNull(fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + // Check that after reconnect working. + assertEquals(1, clientAtomicLong.addAndGet(1)); + assertEquals(2, srvAtomicLong.addAndGet(1)); + + clientAtomicLong.close(); + } + + /** + * @throws Exception If failed. + */ + public void testLatchReconnect() throws Exception { Ignite client = grid(serverCount()); assertTrue(client.cluster().localNode().isClient()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java new file mode 100644 index 0000000..f938733 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectFailoverSelfTest extends IgniteClientReconnectAbstractTest { + /** */ + public final Integer THREADS = 8; + + /** */ + public final Integer RESTART_CNT = 30; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(new CacheConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** */ + private volatile CyclicBarrier barrier; + + /** + * @throws Exception If failed. + */ + public void testCacheOperationReconnectApi() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + assertNotNull(client.cache(null)); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final AtomicBoolean stop = new AtomicBoolean(false); + + final AtomicLong cntr = new AtomicLong(); + + final IgniteQueue<Object> queue = client.queue("test-queue", 1000, new CollectionConfiguration()); + + final IgniteAtomicLong atomicLong = client.atomicLong("counter", 0, true); + + final IgniteAtomicSequence sequence = client.atomicSequence("sequence", 0, true); + + final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteCache<Integer, Integer> cache = client.cache(null); + + IgniteCompute compute = client.compute(); + + Set<Integer> keys = new TreeSet<>(); + final Map<Integer, Integer> entries = new TreeMap<>(); + + for (int i = 0; i < 50; i++) { + keys.add(i); + entries.put(i, i); + } + + while (!stop.get()) { + cntr.incrementAndGet(); + + try { + // Start cache operations. + for (int i = 0; i < 10; i++) { + cache.put(i, i); + cache.get(i); + cache.remove(i); + + cache.putAll(entries); + + cache.invokeAll(keys, new CacheEntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, + Object... arguments) throws EntryProcessorException { + if (ThreadLocalRandom.current().nextBoolean()) + entry.setValue(entry.getValue() * 100); + else + entry.remove(); + + return entry; + } + }); + } + + try (Transaction tx = client.transactions().txStart()) { + for (int i = 0; i < 10; i++) { + cache.put(i, i); + cache.get(i); + } + + tx.commit(); + } + + // Start async cache operations. + IgniteCache<Integer, Integer> asyncCache = cache.withAsync(); + + for (int i = 0; i < 10; i++) { + asyncCache.put(i, i); + + asyncCache.future().get(); + + asyncCache.get(i); + + asyncCache.future().get(); + } + + // Compute. +// for (int i = 0; i < 10; i++) { +// compute.broadcast(new IgniteCallable<Integer>() { +// @IgniteInstanceResource +// private Ignite ignite; +// +// @Override public Integer call() throws Exception { +// return ignite.cache(null).localSize(); +// } +// }); +// +// compute.broadcast(new IgniteRunnable() { +// @Override public void run() { +// // No-op. +// } +// }); +// +// compute.apply(new C1<String, String>() { +// @Override public String apply(String o) { +// return o.toUpperCase(); +// } +// }, Arrays.asList("a", "b", "c")); +// } + + //Data structures. +// for (int i = 0; i < 10; i++) { +// assert atomicLong.incrementAndGet() >= 0; +// +// queue.offer("Test item"); +// +// if (ThreadLocalRandom.current().nextBoolean()) +// for (int j = 0; j < 50; j++) +// queue.poll(); +// +// assert queue.size() <= 1000; +// +// assert sequence.addAndGet(i + 1) >= 0; +// } + } + catch (CacheException | IgniteException e) { + log.info("Operation failed, ignore: " + e); + } + + if (cntr.get() % 100 == 0) + log.info("Iteration: " + cntr); + + if (barrier != null) + try { + barrier.await(); + } + catch (BrokenBarrierException e) { + log.warning("Broken barrier.", e); + + break; + } + } + + return null; + } + }, THREADS, "test-operation-thread-" + client.name()); + + final AtomicBoolean disconnected = new AtomicBoolean(false); + + final AtomicBoolean reconnected = new AtomicBoolean(false); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + if (!reconnected.get()) + disconnected.set(true); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + if (disconnected.get()) + reconnected.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + for (int i = 0; i < RESTART_CNT; i++) { + U.sleep(2000); + + log.info("Block reconnect."); + + reconnected.set(false); + + disconnected.set(false); + + log.info("Fail client."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return disconnected.get(); + } + }, 5000L); + + barrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + barrier = null; + } + }); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return reconnected.get(); + } + }, 5000L); + + try { + barrier.await(10, TimeUnit.SECONDS); + } + catch (TimeoutException e) { + log.error("Failed. Operation hangs."); + + for (Ignite ignite : G.allGrids()) + dumpCacheDebugInfo(ignite); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = barrier; + + if (barrier0 != null) + barrier0.reset(); + + stop.set(true); + + fail("Failed to wait for update."); + } + } + + stop.set(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/669ab137/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java index 88f0c5f..affbb54 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -33,6 +33,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite { suite.addTestSuite(IgniteClientReconnectStopTest.class); suite.addTestSuite(IgniteClientReconnectApiBlockTest.class); + suite.addTestSuite(IgniteClientReconnectFailoverSelfTest.class); suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class); suite.addTestSuite(IgniteClientReconnectCacheTest.class); suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);
