Repository: ignite Updated Branches: refs/heads/ignite-3478 62dbb11d5 -> 6d747761e
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d747761 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d747761 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d747761 Branch: refs/heads/ignite-3478 Commit: 6d747761eeb3c4c8d29e42d61bb0e0ffa2fdac10 Parents: 62dbb11 Author: sboikov <[email protected]> Authored: Mon Sep 18 13:11:00 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 18 15:39:34 2017 +0300 ---------------------------------------------------------------------- .../cache/mvcc/CacheMvccTransactionsTest.java | 210 ++++++++++++++++++- 1 file changed, 204 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6d747761/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 002da40..99ce163 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; import org.apache.ignite.internal.util.lang.GridInClosure3; @@ -113,9 +114,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - verifyCoordinatorInternalState(); - - stopAllGrids(); + try { + verifyCoordinatorInternalState(); + } + finally { + stopAllGrids(); + } } /** @@ -1124,6 +1128,164 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed + */ + public void testOperationsSequenceConsistency_SingleNode() throws Exception { + operationsSequenceConsistency(1, 0, 0, 64); + } + + /** + * TODO IGNITE-3478: enable when scan is fully implemented. + * + * @throws Exception If failed + */ +// public void testOperationsSequenceConsistency_ClientServer_Backups0() throws Exception { +// operationsSequenceConsistency(4, 2, 0, 64); +// } + + /** + * @param srvs Number of server nodes. + * @param clients Number of client nodes. + * @param cacheBackups Number of cache backups. + * @param cacheParts Number of cache partitions. + * @throws Exception If failed. + */ + private void operationsSequenceConsistency( + final int srvs, + final int clients, + int cacheBackups, + int cacheParts + ) + throws Exception + { + final int writers = 4; + + final int readers = 4; + + final long time = 10_000; + + final AtomicInteger keyCntr = new AtomicInteger(); + + GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cnt = 0; + + while (!stop.get()) { + IgniteCache<Integer, Value> cache = randomCache(caches, rnd); + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + Integer key = keyCntr.incrementAndGet(); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, new Value(idx, cnt++)); + + tx.commit(); + } + + if (key > 1_000_000) + break; + } + + info("Writer finished, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + IgniteCache<Integer, Value> cache = randomCache(caches, rnd); + + Map<Integer, TreeSet<Integer>> vals = new HashMap<>(); + + for (IgniteCache.Entry<Integer, Value> e : cache) { + Value val = e.getValue(); + + assertNotNull(val); + + TreeSet<Integer> cntrs = vals.get(val.key); + + if (cntrs == null) + vals.put(val.key, cntrs = new TreeSet<>()); + + boolean add = cntrs.add(val.cnt); + + assertTrue(add); + } + + for (TreeSet<Integer> readCntrs : vals.values()) { + for (int i = 0; i < readCntrs.size(); i++) + assertTrue(readCntrs.contains(i)); + } + } + } + }; + + readWriteTest(srvs, + clients, + cacheBackups, + cacheParts, + writers, + readers, + time, + null, + writer, + reader); + } + + /** + * @throws Exception If failed. + */ + public void testActiveQueryCleanupOnNodeFailure() throws Exception { + testSpi = true; + + final Ignite srv = startGrid(0); + + srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1024)); + + client = true; + + final Ignite client = startGrid(1); + + TestRecordingCommunicationSpi srvSpi = TestRecordingCommunicationSpi.spi(srv); + + srvSpi.blockMessages(GridNearGetResponse.class, getTestIgniteInstanceName(1)); + + TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class, + getTestIgniteInstanceName(0)); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + cache.getAll(F.asSet(1, 2, 3)); + + return null; + } + }); + + srvSpi.waitForBlocked(); + + assertFalse(fut.isDone()); + + stopGrid(1); + + verifyCoordinatorInternalState(); + + try { + fut.get(); + } + catch (Exception ignore) { + // No-op. + } + } + + /** * @param N Number of object to update in single transaction. * @param srvs Number of server nodes. * @param clients Number of client nodes. @@ -1450,11 +1612,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** - * + * @throws Exception If failed. */ - private void verifyCoordinatorInternalState() { + private void verifyCoordinatorInternalState() throws Exception { for (Ignite node : G.allGrids()) { - CacheCoordinatorsSharedManager crd = ((IgniteKernal)node).context().cache().context().coordinators(); + final CacheCoordinatorsSharedManager crd = ((IgniteKernal)node).context().cache().context().coordinators(); Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs"); @@ -1467,6 +1629,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { Map ackFuts = GridTestUtils.getFieldValue(crd, "ackFuts"); assertTrue(ackFuts.isEmpty()); + + // TODO IGNITE-3478 +// assertTrue(GridTestUtils.waitForCondition( +// new GridAbsPredicate() { +// @Override public boolean apply() { +// Map activeQrys = GridTestUtils.getFieldValue(crd, "activeQueries"); +// +// return activeQrys.isEmpty(); +// } +// }, 5000) +// ); } } @@ -1509,6 +1682,31 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * */ + static class Value { + /** */ + int key; + + /** */ + int cnt; + + /** + * @param key Key. + * @param cnt Update count. + */ + Value(int key, int cnt) { + this.key = key; + this.cnt = cnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Value.class, this); + } + } + + /** + * + */ enum ReadMode { /** */ GET_ALL,
