Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 b14b73e85 -> 4b90d91ab
IGNITE-901 Added test for collections. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b90d91a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b90d91a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b90d91a Branch: refs/heads/ignite-901 Commit: 4b90d91ab95099efa75dd43160c0038432af95c3 Parents: b14b73e Author: nikolay_tikhonov <[email protected]> Authored: Fri Jul 3 17:24:55 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Jul 3 17:24:55 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientReconnectAtomicsTest.java | 6 +- .../IgniteClientReconnectCollectionsTest.java | 350 +++++++++++++++++-- 2 files changed, 319 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b90d91a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index a827671..e629d0a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -24,7 +24,7 @@ import org.apache.ignite.testframework.*; import java.util.concurrent.*; /** - * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect. + * */ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest { /** {@inheritDoc} */ @@ -128,7 +128,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr srvAtomicSeq.batchSize(1); - commSpi.msgClass = GridNearLockResponse.class; + commSpi.blockMsg(GridNearLockResponse.class); final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { @@ -529,7 +529,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr final IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongInProggress", 0, false); - commSpi.msgClass = GridNearLockResponse.class; + commSpi.blockMsg(GridNearLockResponse.class); final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b90d91a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index fcb74cd..54e1329 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -19,18 +19,17 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.testframework.*; import java.util.concurrent.*; -import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.events.EventType.*; /** - * TODO IGNITE-901: test for queue, check set/queue usage after remove, test API block, fail current call on disconnect. + * */ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest { /** {@inheritDoc} */ @@ -46,6 +45,63 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA /** * @throws Exception If failed. */ + public void testQueueReconnect() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + queueReconnect(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + queueReconnect(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testQueueReconnectRemoved() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + queueReconnectRemoved(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + queueReconnectRemoved(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testQueueReconnectInProg() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + queueReconnectInProg(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + queueReconnectInProg(colCfg); + } + + /** + * @throws Exception If failed. + */ public void testSetReconnect() throws Exception { CollectionConfiguration colCfg = new CollectionConfiguration(); @@ -63,6 +119,44 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA } /** + * @throws Exception If failed. + */ + public void testSetReconnectRemove() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + setReconnectRemove(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + setReconnectRemove(colCfg); + } + + /** + * @throws Exception If failed. + */ + public void testSetReconnectInProg() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + setReconnectInProg(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + setReconnectInProg(colCfg); + } + + /** * @param colCfg Collection configuration. * @throws Exception If failed. */ @@ -73,60 +167,248 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); - final String setName = "set-" + colCfg.getAtomicityMode(); IgniteSet<String> clientSet = client.set(setName, colCfg); - IgniteSet<String> srvSet = srv.set(setName, null); + final IgniteSet<String> srvSet = srv.set(setName, null); + + assertTrue(clientSet.add("1")); + + assertFalse(srvSet.add("1")); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertTrue(srvSet.add("2")); + } + }); + + assertFalse(clientSet.add("2")); + + assertTrue(clientSet.remove("2")); + + assertFalse(srvSet.contains("2")); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void setReconnectRemove(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final String setName = "set-rm-" + colCfg.getAtomicityMode(); + + final IgniteSet<String> clientSet = client.set(setName, colCfg); + + final IgniteSet<String> srvSet = srv.set(setName, null); assertTrue(clientSet.add("1")); assertFalse(srvSet.add("1")); - final CountDownLatch disconnectLatch = new CountDownLatch(1); - final CountDownLatch reconnectLatch = new CountDownLatch(1); + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvSet.close(); + } + }); - final TestTcpDiscoverySpi clientSpi = spi(client); + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientSet.add("fail"); - log.info("Block reconnect."); + return null; + } + }, IllegalStateException.class, null); + } - clientSpi.writeLatch = new CountDownLatch(1); + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void setReconnectInProg(final CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); + final Ignite srv = clientRouter(client); - disconnectLatch.countDown(); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); + final String setName = "set-in-progress-" + colCfg.getAtomicityMode(); + + final IgniteSet<String> clientSet = client.set(setName, colCfg); + + final IgniteSet<String> srvSet = srv.set(setName, null); + + assertTrue(clientSet.add("1")); + + assertFalse(srvSet.add("1")); - reconnectLatch.countDown(); - } + BlockTpcCommunicationSpi commSpi = commSpi(srv); - return true; + if (colCfg.getAtomicityMode() == ATOMIC) + commSpi.blockMsg(GridNearAtomicUpdateResponse.class); + else + commSpi.blockMsg(GridNearTxPrepareResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return clientSet.add("2"); } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + }); - srvSpi.failNode(client.cluster().localNode().id(), null); + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); - assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + assertNotDone(fut); - assertTrue(srvSet.add("2")); + commSpi.unblockMsg(); - log.info("Allow reconnect."); + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + // Check that future failed. + assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); + } + }); - clientSpi.writeLatch.countDown(); + assertTrue(clientSet.add("3")); - assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + assertFalse(srvSet.add("3")); - assertFalse(clientSet.add("2")); + srvSet.close(); + } - assertTrue(clientSet.remove("2")); + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void queueReconnect(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); - assertFalse(srvSet.contains("2")); + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final String setName = "queue-" + colCfg.getAtomicityMode(); + + IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg); + + final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null); + + assertTrue(clientQueue.offer("1")); + + assertTrue(srvQueue.contains("1")); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + assertTrue(srvQueue.add("2")); + } + }); + + assertTrue(clientQueue.contains("2")); + + assertEquals("1", clientQueue.poll()); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void queueReconnectRemoved(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final String setName = "queue-rmv" + colCfg.getAtomicityMode(); + + final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg); + + final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null); + + assertTrue(clientQueue.add("1")); + + assertTrue(srvQueue.add("2")); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvQueue.close(); + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientQueue.add("fail"); + + return null; + } + }, IllegalStateException.class, null); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void queueReconnectInProg(final CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + final String setName = "queue-rmv" + colCfg.getAtomicityMode(); + + final IgniteQueue<String> clientQueue = client.queue(setName, 10, colCfg); + + final IgniteQueue<String> srvQueue = srv.queue(setName, 10, null); + + assertTrue(clientQueue.offer("1")); + + assertTrue(srvQueue.contains("1")); + + BlockTpcCommunicationSpi commSpi = commSpi(srv); + + if (colCfg.getAtomicityMode() == ATOMIC) + commSpi.blockMsg(GridNearAtomicUpdateResponse.class); + else + commSpi.blockMsg(GridNearTxPrepareResponse.class); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return clientQueue.add("2"); + } + }); + + // Check that client waiting operation. + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return fut.get(200); + } + }, IgniteFutureTimeoutCheckedException.class, null); + + assertNotDone(fut); + + commSpi.unblockMsg(); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + // Check that future failed. + assertNotNull("Future was not failed. Atomic mode: " + colCfg.getAtomicityMode() + ".", fut.error()); + assertEquals(IgniteClientDisconnectedException.class, fut.error().getClass()); + } + }); + + assertTrue(clientQueue.add("3")); + + assertEquals("1", clientQueue.poll()); } }
