http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 8bf9e39..9da6876 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -27,13 +27,17 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -58,6 +62,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridInClosure3; @@ -82,14 +88,21 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionOptimisticException; import org.jetbrains.annotations.Nullable; +import org.junit.Assert; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * TODO IGNITE-3478: extend tests to use single/mutiple nodes, all tx types. @@ -182,7 +195,38 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPessimisticTx1() throws Exception { - checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() { + checkTx1(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticSerializableTx1() throws Exception { + checkTx1(OPTIMISTIC, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticRepeatableReadTx1() throws Exception { + checkTx1(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticReadCommittedTx1() throws Exception { + checkTx1(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void checkTx1(final TransactionConcurrency concurrency, final TransactionIsolation isolation) + throws Exception { + checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() { @Override public void apply(IgniteCache<Integer, Integer> cache) { try { IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); @@ -192,7 +236,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (Integer key : keys) { log.info("Test key: " + key); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { Integer val = cache.get(key); assertNull(val); @@ -222,7 +266,24 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPessimisticTx2() throws Exception { - checkPessimisticTx(new CI1<IgniteCache<Integer, Integer>>() { + checkTx2(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticSerializableTx2() throws Exception { + checkTx2(OPTIMISTIC, SERIALIZABLE); + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void checkTx2(final TransactionConcurrency concurrency, final TransactionIsolation isolation) + throws Exception { + checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() { @Override public void apply(IgniteCache<Integer, Integer> cache) { try { IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); @@ -232,7 +293,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (Integer key : keys) { log.info("Test key: " + key); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { cache.put(key, key); cache.put(key + 1, key + 1); @@ -257,9 +318,15 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @param c Closure to run. * @throws Exception If failed. */ - private void checkPessimisticTx(IgniteInClosure<IgniteCache<Integer, Integer>> c) throws Exception { + private void checkTxWithAllCaches(IgniteInClosure<IgniteCache<Integer, Integer>> c) throws Exception { + client = false; + startGridsMultiThreaded(SRVS); + client = true; + + startGrid(SRVS); + try { for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { logCacheInfo(ccfg); @@ -289,6 +356,21 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testActiveQueriesCleanup() throws Exception { + activeQueriesCleanup(false); + } + + /** + * @throws Exception If failed. + */ + public void testActiveQueriesCleanupTx() throws Exception { + activeQueriesCleanup(true); + } + + /** + * @param tx If {@code true} tests reads inside transaction. + * @throws Exception If failed. + */ + private void activeQueriesCleanup(final boolean tx) throws Exception { startGridsMultiThreaded(SRVS); client = true; @@ -307,7 +389,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { @Override public void apply(Integer idx) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - IgniteCache cache = ignite(idx % NODES).cache(DEFAULT_CACHE_NAME); + Ignite node = ignite(idx % NODES); + + IgniteTransactions txs = node.transactions(); + + IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); while (System.currentTimeMillis() < stopTime) { int keyCnt = rnd.nextInt(10) + 1; @@ -317,7 +403,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < keyCnt; i++) keys.add(rnd.nextInt()); - cache.getAll(keys); + if (tx) { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.getAll(keys); + + if (rnd.nextBoolean()) + tx.commit(); + else + tx.rollback(); + } + } + else + cache.getAll(keys); } } }, NODES * 2, "get-thread"); @@ -329,7 +426,107 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testSimplePutGetAll() throws Exception { + public void testTxReadIsolationSimple() throws Exception { + Ignite srv0 = startGrids(4); + + client = true; + + startGrid(4); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + IgniteCache<Object, Object> cache0 = srv0.createCache(ccfg); + + final Map<Integer, Integer> startVals = new HashMap<>(); + + final int KEYS = 10; + + for (int i = 0; i < KEYS; i++) + startVals.put(i, 0); + + for (final TransactionIsolation isolation : TransactionIsolation.values()) { + for (final Ignite node : G.allGrids()) { + info("Run test [node=" + node.name() + ", isolation=" + isolation + ']'); + + try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache0.putAll(startVals); + + tx.commit(); + } + + final CountDownLatch readStart = new CountDownLatch(1); + + final CountDownLatch readProceed = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, isolation)) { + assertEquals(0, cache.get(0)); + + readStart.countDown(); + + assertTrue(readProceed.await(5, TimeUnit.SECONDS)); + + if (isolation == READ_COMMITTED) { + assertNull(cache.get(1)); + + assertEquals(1, cache.get(2)); + + Map<Object, Object> res = cache.getAll(startVals.keySet()); + + assertEquals(startVals.size() / 2, res.size()); + + for (Map.Entry<Object, Object> e : res.entrySet()) + assertEquals("Invalid value for key: " + e.getKey(), 1, e.getValue()); + } + else { + assertEquals(0, cache.get(1)); + + assertEquals(0, cache.get(2)); + + Map<Object, Object> res = cache.getAll(startVals.keySet()); + + assertEquals(startVals.size(), res.size()); + + for (Map.Entry<Object, Object> e : res.entrySet()) + assertEquals("Invalid value for key: " + e.getKey(), 0, e.getValue()); + } + + tx.rollback(); + } + + return null; + } + }); + + assertTrue(readStart.await(5, TimeUnit.SECONDS)); + + for (int i = 0; i < KEYS; i++) { + try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (i % 2 == 0) + cache0.put(i, 1); + else + cache0.remove(i); + + tx.commit(); + } + } + + readProceed.countDown(); + + fut.get(); + } + } + + srv0.destroyCache(cache0.getName()); + } + } + + /** + * @throws Exception If failed. + */ + public void testPutGetAllSimple() throws Exception { Ignite node = startGrid(0); IgniteTransactions txs = node.transactions(); @@ -388,22 +585,22 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testSimplePutRemove() throws Exception { - simplePutRemove(false); + public void testPutRemoveSimple() throws Exception { + putRemoveSimple(false); } /** * @throws Exception If failed. */ - public void testSimplePutRemove_LargeKeys() throws Exception { - simplePutRemove(true); + public void testPutRemoveSimple_LargeKeys() throws Exception { + putRemoveSimple(true); } /** * @throws Exception If failed. * @param largeKeys {@code True} to use large keys (not fitting in single page). */ - private void simplePutRemove(boolean largeKeys) throws Exception { + private void putRemoveSimple(boolean largeKeys) throws Exception { Ignite node = startGrid(0); IgniteTransactions txs = node.transactions(); @@ -608,7 +805,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { boolean block = true; @Override public boolean apply(ClusterNode node, Message msg) { - if (block && msg instanceof CoordinatorTxAckRequest) { + if (block && msg instanceof CoordinatorAckRequestTx) { block = false; return true; @@ -804,9 +1001,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (boolean otherPuts : vals) { for (boolean putOnStart : vals) { - cleanupWaitsForGet1(otherPuts, putOnStart); + for (boolean inTx : vals) { + cleanupWaitsForGet1(otherPuts, putOnStart, inTx); - afterTest(); + afterTest(); + } } } } @@ -814,10 +1013,13 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @param otherPuts {@code True} to update unrelated keys to increment mvcc counter. * @param putOnStart {@code True} to put data in cache before getAll. + * @param inTx {@code True} to read inside transaction. * @throws Exception If failed. */ - private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart) throws Exception { - info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + putOnStart + "]"); + private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart, final boolean inTx) throws Exception { + info("cleanupWaitsForGet [otherPuts=" + otherPuts + + ", putOnStart=" + putOnStart + + ", inTx=" + inTx + "]"); testSpi = true; @@ -864,7 +1066,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { @Override public Void call() throws Exception { IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName()); - Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2)); + + Map<Integer, Integer> vals; + + if (inTx) { + try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + vals = cache.getAll(F.asSet(key1, key2)); + + tx.rollback(); + } + } + else + vals = cache.getAll(F.asSet(key1, key2)); if (putOnStart) { assertEquals(2, vals.size()); @@ -944,7 +1157,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() { @Override public void apply(ClusterNode node, Message msg) { - if (msg instanceof CoordinatorTxAckRequest) + if (msg instanceof CoordinatorAckRequestTx) doSleep(2000); } }); @@ -1063,7 +1276,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { private boolean blocked; @Override public boolean apply(ClusterNode node, Message msg) { - if (!blocked && (msg instanceof CoordinatorTxAckRequest)) { + if (!blocked && (msg instanceof CoordinatorAckRequestTx)) { blocked = true; return true; @@ -1142,46 +1355,53 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode() throws Exception { - putAllGetAll(false, 1, 0, 0, 64); + putAllGetAll(null, 1, 0, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode_SinglePartition() throws Exception { - putAllGetAll(false, 1, 0, 0, 1); + putAllGetAll(null, 1, 0, 0, 1); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups0() throws Exception { - putAllGetAll(false, 4, 2, 0, 64); + putAllGetAll(null, 4, 2, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups1() throws Exception { - putAllGetAll(false, 4, 2, 1, 64); + putAllGetAll(null, 4, 2, 1, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups2() throws Exception { - putAllGetAll(false, 4, 2, 2, 64); + putAllGetAll(null, 4, 2, 2, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator() throws Exception { - putAllGetAll(true, 4, 2, 1, 64); + putAllGetAll(RestartMode.RESTART_CRD, 4, 2, 1, 64); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllGetAll_ClientServer_Backups1_Restart() throws Exception { + putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 2, 1, 64); } /** - * @param restartCrd Coordinator restart flag. + * @param restartMode Restart mode. * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -1189,7 +1409,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void putAllGetAll( - boolean restartCrd, + RestartMode restartMode, final int srvs, final int clients, int cacheBackups, @@ -1202,9 +1422,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final int readers = 4; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int min = idx * RANGE; @@ -1222,30 +1442,35 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (map.size() < RANGE) map.put(rnd.nextInt(min, max), v); - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - if (updated && rnd.nextBoolean()) { - Map<Integer, Integer> res = cache.getAll(map.keySet()); + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (updated && rnd.nextBoolean()) { + Map<Integer, Integer> res = cache.cache.getAll(map.keySet()); - for (Integer k : map.keySet()) - assertEquals(v - 1, (Object)res.get(k)); - } + for (Integer k : map.keySet()) + assertEquals(v - 1, (Object)res.get(k)); + } - cache.putAll(map); + cache.cache.putAll(map); - tx.commit(); + tx.commit(); - updated = true; - } + updated = true; + } - if (rnd.nextBoolean()) { - Map<Integer, Integer> res = cache.getAll(map.keySet()); + if (rnd.nextBoolean()) { + Map<Integer, Integer> res = cache.cache.getAll(map.keySet()); - for (Integer k : map.keySet()) - assertEquals(v, (Object)res.get(k)); + for (Integer k : map.keySet()) + assertEquals(v, (Object)res.get(k)); + } + } + finally { + cache.readUnlock(); } map.clear(); @@ -1257,9 +1482,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); Set<Integer> keys = new LinkedHashSet<>(); @@ -1275,9 +1500,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (keys.size() < RANGE) keys.add(rnd.nextInt(min, max)); - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + + Map<Integer, Integer> map; - Map<Integer, Integer> map = cache.getAll(keys); + try { + map = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); + } assertTrue("Invalid map size: " + map.size(), map.isEmpty() || map.size() == RANGE); @@ -1314,7 +1546,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { }; readWriteTest( - restartCrd, + restartMode, srvs, clients, cacheBackups, @@ -1334,49 +1566,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testAccountsTxGetAll_SingleNode() throws Exception { - accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL); + accountsTxReadAll(1, 0, 0, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_SingleNode_SinglePartition() throws Exception { - accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL); + accountsTxReadAll(1, 0, 0, 1, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() throws Exception { - accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL); + accountsTxReadAll(1, 0, 0, 1, true, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception { - accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL); + accountsTxReadAll(4, 2, 0, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception { - accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL); + accountsTxReadAll(4, 2, 1, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception { - accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL); + accountsTxReadAll(4, 2, 2, 64, false, ReadMode.GET_ALL); } /** * @throws Exception If failed. */ public void testAccountsTxScan_SingleNode_SinglePartition() throws Exception { - accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN); + accountsTxReadAll(1, 0, 0, 1, false, ReadMode.SCAN); } /** @@ -1388,7 +1620,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @param readMode Read mode. * @throws Exception If failed. */ - private void accountsTxGetAll( + private void accountsTxReadAll( final int srvs, final int clients, int cacheBackups, @@ -1425,109 +1657,115 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final Set<Integer> rmvdIds = new HashSet<>(); - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int cnt = 0; while (!stop.get()) { - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); - - cnt++; + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - Integer id1 = rnd.nextInt(ACCOUNTS); - Integer id2 = rnd.nextInt(ACCOUNTS); + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - while (id1.equals(id2)) - id2 = rnd.nextInt(ACCOUNTS); + cnt++; - TreeSet<Integer> keys = new TreeSet<>(); + Integer id1 = rnd.nextInt(ACCOUNTS); + Integer id2 = rnd.nextInt(ACCOUNTS); - keys.add(id1); - keys.add(id2); + while (id1.equals(id2)) + id2 = rnd.nextInt(ACCOUNTS); - Integer cntr1 = null; - Integer cntr2 = null; + TreeSet<Integer> keys = new TreeSet<>(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - MvccTestAccount a1; - MvccTestAccount a2; + keys.add(id1); + keys.add(id2); - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + Integer cntr1 = null; + Integer cntr2 = null; - a1 = accounts.get(id1); - a2 = accounts.get(id2); + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + MvccTestAccount a1; + MvccTestAccount a2; - if (!withRmvs) { - assertNotNull(a1); - assertNotNull(a2); + Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys); - cntr1 = a1.updateCnt + 1; - cntr2 = a2.updateCnt + 1; + a1 = accounts.get(id1); + a2 = accounts.get(id2); - cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1)); - cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2)); - } - else { - if (a1 != null || a2 != null) { - if (a1 != null && a2 != null) { - Integer rmvd = null; + if (!withRmvs) { + assertNotNull(a1); + assertNotNull(a2); - if (rnd.nextInt(10) == 0) { - synchronized (rmvdIds) { - if (rmvdIds.size() < ACCOUNTS / 2) { - rmvd = rnd.nextBoolean() ? id1 : id2; + cntr1 = a1.updateCnt + 1; + cntr2 = a2.updateCnt + 1; - assertTrue(rmvdIds.add(rmvd)); + cache.cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2)); + } + else { + if (a1 != null || a2 != null) { + if (a1 != null && a2 != null) { + Integer rmvd = null; + + if (rnd.nextInt(10) == 0) { + synchronized (rmvdIds) { + if (rmvdIds.size() < ACCOUNTS / 2) { + rmvd = rnd.nextBoolean() ? id1 : id2; + + assertTrue(rmvdIds.add(rmvd)); + } } } - } - if (rmvd != null) { - cache.remove(rmvd); + if (rmvd != null) { + cache.cache.remove(rmvd); - cache.put(rmvd.equals(id1) ? id2 : id1, - new MvccTestAccount(a1.val + a2.val, 1)); + cache.cache.put(rmvd.equals(id1) ? id2 : id1, + new MvccTestAccount(a1.val + a2.val, 1)); + } + else { + cache.cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); + } } else { - cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); - cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); - } - } - else { - if (a1 == null) { - cache.put(id1, new MvccTestAccount(100, 1)); - cache.put(id2, new MvccTestAccount(a2.val - 100, 1)); + if (a1 == null) { + cache.cache.put(id1, new MvccTestAccount(100, 1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 100, 1)); - assertTrue(rmvdIds.remove(id1)); - } - else { - cache.put(id1, new MvccTestAccount(a1.val - 100, 1)); - cache.put(id2, new MvccTestAccount(100, 1)); + assertTrue(rmvdIds.remove(id1)); + } + else { + cache.cache.put(id1, new MvccTestAccount(a1.val - 100, 1)); + cache.cache.put(id2, new MvccTestAccount(100, 1)); - assertTrue(rmvdIds.remove(id2)); + assertTrue(rmvdIds.remove(id2)); + } } } } - } - tx.commit(); - } + tx.commit(); + } - if (!withRmvs) { - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + if (!withRmvs) { + Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys); - MvccTestAccount a1 = accounts.get(id1); - MvccTestAccount a2 = accounts.get(id2); + MvccTestAccount a1 = accounts.get(id1); + MvccTestAccount a2 = accounts.get(id2); - assertNotNull(a1); - assertNotNull(a2); + assertNotNull(a1); + assertNotNull(a2); - assertTrue(a1.updateCnt >= cntr1); - assertTrue(a2.updateCnt >= cntr2); + assertTrue(a1.updateCnt >= cntr1); + assertTrue(a2.updateCnt >= cntr2); + } + } + finally { + cache.readUnlock(); } } @@ -1535,9 +1773,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); Set<Integer> keys = new LinkedHashSet<>(); @@ -1548,21 +1786,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (keys.size() < ACCOUNTS) keys.add(rnd.nextInt(ACCOUNTS)); - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); Map<Integer, MvccTestAccount> accounts; - if (readMode == ReadMode.SCAN) { - accounts = new HashMap<>(); + try { + if (readMode == ReadMode.SCAN) { + accounts = new HashMap<>(); - for (IgniteCache.Entry<Integer, MvccTestAccount> e : cache) { - MvccTestAccount old = accounts.put(e.getKey(), e.getValue()); + for (IgniteCache.Entry<Integer, MvccTestAccount> e : cache.cache) { + MvccTestAccount old = accounts.put(e.getKey(), e.getValue()); - assertNull(old); + assertNull(old); + } } + else + accounts = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); } - else - accounts = cache.getAll(keys); if (!withRmvs) assertEquals(ACCOUNTS, accounts.size()); @@ -1590,9 +1833,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } if (idx == 0) { - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + + Map<Integer, MvccTestAccount> accounts; - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + try { + accounts = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); + } int sum = 0; @@ -1613,7 +1863,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { }; readWriteTest( - false, + null, srvs, clients, cacheBackups, @@ -1629,130 +1879,411 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testUpdate_N_Objects_SingleNode_SinglePartition() throws Exception { - int[] nValues = {3, 5, 10}; - - for (int n : nValues) { - updateNObjectsTest(n, 1, 0, 0, 1, 10_000); - - afterTest(); - } + public void testPessimisticTxReadsSnapshot_SingleNode_SinglePartition() throws Exception { + txReadsSnapshot(1, 0, 0, 1, true); } /** * @throws Exception If failed. */ - public void testUpdate_N_Objects_SingleNode() throws Exception { - int[] nValues = {3, 5, 10}; - - for (int n : nValues) { - updateNObjectsTest(n, 1, 0, 0, 64, 10_000); + public void testPessimisticTxReadsSnapshot_ClientServer() throws Exception { + txReadsSnapshot(4, 2, 1, 64, true); + } - afterTest(); - } + /** + * @throws Exception If failed. + */ + public void testOptimisticTxReadsSnapshot_SingleNode() throws Exception { + txReadsSnapshot(1, 0, 0, 64, false); } /** - * @throws Exception If failed + * @throws Exception If failed. */ - public void testOperationsSequenceConsistency_SingleNode() throws Exception { - operationsSequenceConsistency(1, 0, 0, 64); + public void testOptimisticTxReadsSnapshot_SingleNode_SinglePartition() throws Exception { + txReadsSnapshot(1, 0, 0, 1, false); } /** - * TODO IGNITE-3478: enable when scan is fully implemented. - * - * @throws Exception If failed + * @throws Exception If failed. */ -// public void testOperationsSequenceConsistency_ClientServer_Backups0() throws Exception { -// operationsSequenceConsistency(4, 2, 0, 64); -// } + public void testOptimisticTxReadsSnapshot_ClientServer() throws Exception { + txReadsSnapshot(4, 2, 1, 64, false); + } /** * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. * @param cacheParts Number of cache partitions. + * @param pessimistic If {@code true} uses pessimistic tx, otherwise optimistic. * @throws Exception If failed. */ - private void operationsSequenceConsistency( + private void txReadsSnapshot( final int srvs, final int clients, int cacheBackups, - int cacheParts - ) - throws Exception - { - final int writers = 4; - - final int readers = 4; + int cacheParts, + final boolean pessimistic + ) throws Exception { + final int ACCOUNTS = 20; - final long time = 10_000; + final int ACCOUNT_START_VAL = 1000; - final AtomicInteger keyCntr = new AtomicInteger(); + final int writers = 4; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final int readers = 4; - int cnt = 0; + final TransactionConcurrency concurrency; + final TransactionIsolation isolation; - while (!stop.get()) { - IgniteCache<Integer, Value> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + if (pessimistic) { + concurrency = PESSIMISTIC; + isolation = REPEATABLE_READ; + } + else { + concurrency = OPTIMISTIC; + isolation = SERIALIZABLE; + } - Integer key = keyCntr.incrementAndGet(); + final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() { + @Override public void apply(IgniteCache<Object, Object> cache) { + final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(key, new Value(idx, cnt++)); + Map<Integer, MvccTestAccount> accounts = new HashMap<>(); - tx.commit(); - } + for (int i = 0; i < ACCOUNTS; i++) + accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1)); - if (key > 1_000_000) - break; - } + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cache.putAll(accounts); - info("Writer finished, updates: " + cnt); + tx.commit(); } - }; + } + }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); + int cnt = 0; + while (!stop.get()) { - IgniteCache<Integer, Value> cache = randomCache(caches, rnd); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - Map<Integer, TreeSet<Integer>> vals = new HashMap<>(); + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - for (IgniteCache.Entry<Integer, Value> e : cache) { - Value val = e.getValue(); + cnt++; - assertNotNull(val); + Integer id1 = rnd.nextInt(ACCOUNTS); + Integer id2 = rnd.nextInt(ACCOUNTS); - TreeSet<Integer> cntrs = vals.get(val.key); + while (id1.equals(id2)) + id2 = rnd.nextInt(ACCOUNTS); - if (cntrs == null) - vals.put(val.key, cntrs = new TreeSet<>()); + TreeSet<Integer> keys = new TreeSet<>(); - boolean add = cntrs.add(val.cnt); + keys.add(id1); + keys.add(id2); - assertTrue(add); - } + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + MvccTestAccount a1; + MvccTestAccount a2; - for (TreeSet<Integer> readCntrs : vals.values()) { - for (int i = 0; i < readCntrs.size(); i++) - assertTrue(readCntrs.contains(i)); - } + Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys); + + a1 = accounts.get(id1); + a2 = accounts.get(id2); + + assertNotNull(a1); + assertNotNull(a2); + + cache.cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); + + tx.commit(); + } + } + finally { + cache.readUnlock(); + } + } + + info("Writer finished, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cnt = 0; + + while (!stop.get()) { + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); + + Map<Integer, MvccTestAccount> accounts = new HashMap<>(); + + if (pessimistic) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + int remaining = ACCOUNTS; + + do { + int readCnt = rnd.nextInt(remaining) + 1; + + Set<Integer> readKeys = new TreeSet<>(); + + for (int i = 0; i < readCnt; i++) + readKeys.add(accounts.size() + i); + + Map<Integer, MvccTestAccount> readRes = cache.cache.getAll(readKeys); + + assertEquals(readCnt, readRes.size()); + + accounts.putAll(readRes); + + remaining = ACCOUNTS - accounts.size(); + } + while (remaining > 0); + + validateSum(accounts); + + tx.commit(); + + cnt++; + } + finally { + cache.readUnlock(); + } + } + else { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + int remaining = ACCOUNTS; + + do { + int readCnt = rnd.nextInt(remaining) + 1; + + if (rnd.nextInt(3) == 0) { + for (int i = 0; i < readCnt; i++) { + Integer key = rnd.nextInt(ACCOUNTS); + + MvccTestAccount account = cache.cache.get(key); + + assertNotNull(account); + + accounts.put(key, account); + } + } + else { + Set<Integer> readKeys = new LinkedHashSet<>(); + + for (int i = 0; i < readCnt; i++) + readKeys.add(rnd.nextInt(ACCOUNTS)); + + Map<Integer, MvccTestAccount> readRes = cache.cache.getAll(readKeys); + + assertEquals(readKeys.size(), readRes.size()); + + accounts.putAll(readRes); + } + + remaining = ACCOUNTS - accounts.size(); + } + while (remaining > 0); + + validateSum(accounts); + + cnt++; + + tx.commit(); + } + catch (TransactionOptimisticException ignore) { + // No-op. + } + finally { + cache.readUnlock(); + } + } + } + + info("Reader finished, txs: " + cnt); + } + + /** + * @param accounts Read accounts. + */ + private void validateSum(Map<Integer, MvccTestAccount> accounts) { + int sum = 0; + + for (int i = 0; i < ACCOUNTS; i++) { + MvccTestAccount account = accounts.get(i); + + assertNotNull(account); + + sum += account.val; + } + + assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum); + } + }; + + readWriteTest( + null, + srvs, + clients, + cacheBackups, + cacheParts, + writers, + readers, + DFLT_TEST_TIME, + init, + writer, + reader); + } + + /** + * @throws Exception If failed. + */ + public void testUpdate_N_Objects_SingleNode_SinglePartition() throws Exception { + int[] nValues = {3, 5, 10}; + + for (int n : nValues) { + updateNObjectsTest(n, 1, 0, 0, 1, 10_000); + + afterTest(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUpdate_N_Objects_SingleNode() throws Exception { + int[] nValues = {3, 5, 10}; + + for (int n : nValues) { + updateNObjectsTest(n, 1, 0, 0, 64, 10_000); + + afterTest(); + } + } + + /** + * @throws Exception If failed + */ + public void testOperationsSequenceConsistency_SingleNode() throws Exception { + operationsSequenceConsistency(1, 0, 0, 64); + } + + /** + * TODO IGNITE-3478: enable when scan is fully implemented. + * + * @throws Exception If failed + */ +// public void testOperationsSequenceConsistency_ClientServer_Backups0() throws Exception { +// operationsSequenceConsistency(4, 2, 0, 64); +// } + + /** + * @param srvs Number of server nodes. + * @param clients Number of client nodes. + * @param cacheBackups Number of cache backups. + * @param cacheParts Number of cache partitions. + * @throws Exception If failed. + */ + private void operationsSequenceConsistency( + final int srvs, + final int clients, + int cacheBackups, + int cacheParts + ) + throws Exception + { + final int writers = 4; + + final int readers = 4; + + final long time = 10_000; + + final AtomicInteger keyCntr = new AtomicInteger(); + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cnt = 0; + + while (!stop.get()) { + TestCache<Integer, Value> cache = randomCache(caches, rnd); + + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); + + Integer key = keyCntr.incrementAndGet(); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.cache.put(key, new Value(idx, cnt++)); + + tx.commit(); + } + + if (key > 1_000_000) + break; + } + finally { + cache.readUnlock(); + } + } + + info("Writer finished, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + TestCache<Integer, Value> cache = randomCache(caches, rnd); + + try { + Map<Integer, TreeSet<Integer>> vals = new HashMap<>(); + + for (IgniteCache.Entry<Integer, Value> e : cache.cache) { + Value val = e.getValue(); + + assertNotNull(val); + + TreeSet<Integer> cntrs = vals.get(val.key); + + if (cntrs == null) + vals.put(val.key, cntrs = new TreeSet<>()); + + boolean add = cntrs.add(val.cnt); + + assertTrue(add); + } + + for (TreeSet<Integer> readCntrs : vals.values()) { + for (int i = 0; i < readCntrs.size(); i++) + assertTrue(readCntrs.contains(i)); + } + } + finally { + cache.readUnlock(); + } } } }; readWriteTest( - false, + null, srvs, clients, cacheBackups, @@ -1766,6 +2297,147 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * TODO IGNITE-3478 enable when recovery is implemented. + * + * @throws Exception If failed. + */ + public void _testNodesRestartNoHang() throws Exception { + final int srvs = 4; + final int clients = 4; + final int writers = 6; + final int readers = 2; + + final int KEYS = 100_000; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Map<Integer, Integer> map = new TreeMap<>(); + + int cnt = 0; + + while (!stop.get()) { + int keys = rnd.nextInt(32) + 1; + + while (map.size() < keys) + map.put(rnd.nextInt(KEYS), cnt); + + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); + + TransactionConcurrency concurrency; + TransactionIsolation isolation; + + switch (rnd.nextInt(3)) { + case 0: { + concurrency = PESSIMISTIC; + isolation = REPEATABLE_READ; + + break; + } + case 1: { + concurrency = OPTIMISTIC; + isolation = REPEATABLE_READ; + + break; + } + case 2: { + concurrency = OPTIMISTIC; + isolation = SERIALIZABLE; + + break; + } + default: { + fail(); + + return; + } + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + if (rnd.nextBoolean()) { + Map<Integer, Integer> res = cache.cache.getAll(map.keySet()); + + assertNotNull(res); + } + + cache.cache.putAll(map); + + tx.commit(); + } + catch (TransactionOptimisticException e) { + assertEquals(SERIALIZABLE, isolation); + } + catch (Exception e) { + Assert.assertTrue("Unexpected error: " + e, X.hasCause(e, ClusterTopologyException.class)); + } + } + finally { + cache.readUnlock(); + } + + map.clear(); + + cnt++; + } + + info("Writer done, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Set<Integer> keys = new LinkedHashSet<>(); + + while (!stop.get()) { + int keyCnt = rnd.nextInt(64) + 1; + + while (keys.size() < keyCnt) + keys.add(rnd.nextInt(KEYS)); + + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + + Map<Integer, Integer> map; + + try { + map = cache.cache.getAll(keys); + + assertNotNull(map); + } + finally { + cache.readUnlock(); + } + + keys.clear(); + } + } + }; + + readWriteTest( + RestartMode.RESTART_RND_SRV, + srvs, + clients, + 1, + 256, + writers, + readers, + DFLT_TEST_TIME, + null, + writer, + reader); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); + } + + /** * @throws Exception If failed. */ public void testActiveQueryCleanupOnNodeFailure() throws Exception { @@ -1783,7 +2455,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { srvSpi.blockMessages(GridNearGetResponse.class, getTestIgniteInstanceName(1)); - TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class, + TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorAckRequestQuery.class, getTestIgniteInstanceName(0)); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { @@ -1817,7 +2489,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testSimpleRebalance() throws Exception { + public void testRebalanceSimple() throws Exception { Ignite srv0 = startGrid(0); IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache( @@ -1880,59 +2552,177 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testSimpleRebalanceWithRemovedValues() throws Exception { + public void testRebalanceWithRemovedValuesSimple() throws Exception { Ignite node = startGrid(0); - IgniteTransactions txs = node.transactions(); + IgniteTransactions txs = node.transactions(); + + final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64)); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int k = 0; k < 100; k++) + cache.remove(k); + + tx.commit(); + } + + Map<Object, Object> expVals = new HashMap<>(); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int k = 100; k < 200; k++) { + cache.put(k, k); + + expVals.put(k, k); + } + + tx.commit(); + } + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int k = 100; k < 200; k++) { + if (k % 2 == 0) { + cache.remove(k); + + expVals.remove(k); + } + } + + tx.commit(); + } + + startGrid(1); + + awaitPartitionMapExchange(); + + checkValues(expVals, jcache(1)); + + stopGrid(0); + + checkValues(expVals, jcache(1)); + } + + /** + * @throws Exception If failed. + */ + public void testCoordinatorFailureSimplePessimisticTx() throws Exception { + coordinatorFailureSimple(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testCoordinatorFailureSimpleSerializableTx() throws Exception { + coordinatorFailureSimple(OPTIMISTIC, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + public void testCoordinatorFailureSimpleOptimisticTx() throws Exception { + coordinatorFailureSimple(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void coordinatorFailureSimple( + final TransactionConcurrency concurrency, + final TransactionIsolation isolation + ) throws Exception { + testSpi = true; + + startGrids(3); + + client = true; + + final Ignite client = startGrid(3); + + final IgniteCache cache = client.createCache( + cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)); + + final Integer key1 = primaryKey(jcache(1)); + final Integer key2 = primaryKey(jcache(2)); + + TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(ignite(0)); + + crdSpi.blockMessages(MvccCoordinatorVersionResponse.class, client.name()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try { + try (Transaction tx = client.transactions().txStart(concurrency, isolation)) { + cache.put(key1, 1); + cache.put(key2, 2); + + tx.commit(); + } - final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64)); + fail(); + } + catch (ClusterTopologyException e) { + info("Expected exception: " + e); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - for (int k = 0; k < 100; k++) - cache.remove(k); + assertNotNull(e.retryReadyFuture()); - tx.commit(); - } + e.retryReadyFuture().get(); + } - Map<Object, Object> expVals = new HashMap<>(); + return null; + } + }, "tx-thread"); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - for (int k = 100; k < 200; k++) { - cache.put(k, k); + crdSpi.waitForBlocked(); - expVals.put(k, k); - } + stopGrid(0); - tx.commit(); - } + fut.get(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - for (int k = 100; k < 200; k++) { - if (k % 2 == 0) { - cache.remove(k); + assertNull(cache.get(key1)); + assertNull(cache.get(key2)); - expVals.remove(k); - } - } + try (Transaction tx = client.transactions().txStart(concurrency, isolation)) { + cache.put(key1, 1); + cache.put(key2, 2); tx.commit(); } - startGrid(1); - - awaitPartitionMapExchange(); + assertEquals(1, cache.get(key1)); + assertEquals(2, cache.get(key2)); + } - checkValues(expVals, jcache(1)); + /** + * @throws Exception If failed. + */ + public void testTxPrepareFailureSimplePessimisticTx() throws Exception { + txPrepareFailureSimple(PESSIMISTIC, REPEATABLE_READ); + } - stopGrid(0); + /** + * @throws Exception If failed. + */ + public void testTxPrepareFailureSimpleSerializableTx() throws Exception { + txPrepareFailureSimple(OPTIMISTIC, SERIALIZABLE); + } - checkValues(expVals, jcache(1)); + /** + * @throws Exception If failed. + */ + public void testTxPrepareFailureSimpleOptimisticTx() throws Exception { + txPrepareFailureSimple(OPTIMISTIC, REPEATABLE_READ); } /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. * @throws Exception If failed. */ - public void testCoordinatorFailurePessimisticTx() throws Exception { + private void txPrepareFailureSimple( + final TransactionConcurrency concurrency, + final TransactionIsolation isolation + ) throws Exception { testSpi = true; startGrids(3); @@ -1947,14 +2737,14 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final Integer key1 = primaryKey(jcache(1)); final Integer key2 = primaryKey(jcache(2)); - TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(ignite(0)); + TestRecordingCommunicationSpi srv1Spi = TestRecordingCommunicationSpi.spi(ignite(1)); - crdSpi.blockMessages(MvccCoordinatorVersionResponse.class, client.name()); + srv1Spi.blockMessages(GridNearTxPrepareResponse.class, client.name()); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { try { - try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = client.transactions().txStart(concurrency, isolation)) { cache.put(key1, 1); cache.put(key2, 2); @@ -1965,22 +2755,28 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } catch (ClusterTopologyException e) { info("Expected exception: " + e); + + assertNotNull(e.retryReadyFuture()); + + e.retryReadyFuture().get(); } return null; } }, "tx-thread"); - crdSpi.waitForBlocked(); + srv1Spi.waitForBlocked(); - stopGrid(0); + assertFalse(fut.isDone()); + + stopGrid(1); fut.get(); assertNull(cache.get(key1)); assertNull(cache.get(key2)); - try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = client.transactions().txStart(concurrency, isolation)) { cache.put(key1, 1); cache.put(key2, 2); @@ -1994,32 +2790,165 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception { - for (int i = 1; i <= 3; i++) { - readInProgressCoordinatorFailsSimple(false, i); + public void testSerializableTxRemap() throws Exception { + testSpi = true; - afterTest(); + startGrids(2); + + client = true; + + final Ignite client = startGrid(2); + + final IgniteCache cache = client.createCache( + cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)); + + final Map<Object, Object> vals = new HashMap<>(); + + for (int i = 0; i < 100; i++) + vals.put(i, i); + + TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(ignite(2)); + + clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridNearTxPrepareRequest; + } + }); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.putAll(vals); + + tx.commit(); + } + + return null; + } + }, "tx-thread"); + + clientSpi.waitForBlocked(2); + + this.client = false; + + startGrid(3); + + assertFalse(fut.isDone()); + + clientSpi.stopBlock(); + + fut.get(); + + for (Ignite node : G.allGrids()) + checkValues(vals, node.cache(cache.getName())); + } + + /** + * @throws Exception If failed. + */ + public void testTxInProgressCoordinatorChangeSimple() throws Exception { + txInProgressCoordinatorChangeSimple(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxInProgressCoordinatorChangeSimple_Readonly() throws Exception { + txInProgressCoordinatorChangeSimple(true); + } + + /** + * @param readOnly If {@code true} tests read-only transaction. + * @throws Exception If failed. + */ + private void txInProgressCoordinatorChangeSimple(boolean readOnly) throws Exception { + CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure()); + + Ignite srv0 = startGrids(4); + + client = true; + + startGrid(4); + + client = false; + + nodeAttr = CRD_ATTR; + + int crdIdx = 5; + + startGrid(crdIdx); + + srv0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setNodeFilter(new CoordinatorNodeFilter())); + + Set<Integer> keys = F.asSet(1, 2, 3); + + for (int i = 0; i < 5; i++) { + Ignite node = ignite(i); + + info("Test with node: " + node.name()); + + IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + assertTrue(cache.getAll(keys).isEmpty()); + + if (!readOnly) + cache.put(0, 0); + + startGrid(crdIdx + 1); + + stopGrid(crdIdx); + + crdIdx++; + + tx.commit(); + } + + checkActiveQueriesCleanup(ignite(crdIdx)); } } + + /** + * @throws Exception If failed. + */ + public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception { + readInProgressCoordinatorFailsSimple(false); + } /** * @throws Exception If failed. */ public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception { - for (int i = 1; i <= 3; i++) { - readInProgressCoordinatorFailsSimple(true, i); + readInProgressCoordinatorFailsSimple(true); + } - afterTest(); + /** + * @param fromClient {@code True} if read from client node, otherwise from server node. + * @throws Exception If failed. + */ + private void readInProgressCoordinatorFailsSimple(boolean fromClient) throws Exception { + for (boolean readInTx : new boolean[]{false, true}) { + for (int i = 1; i <= 3; i++) { + readInProgressCoordinatorFailsSimple(fromClient, i, readInTx); + + afterTest(); + } } } /** * @param fromClient {@code True} if read from client node, otherwise from server node. * @param crdChangeCnt Number of coordinator changes. + * @param readInTx {@code True} to read inside transaction. * @throws Exception If failed. */ - private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt) throws Exception { - info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient + ", crdChangeCnt=" + crdChangeCnt + ']'); + private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt, final boolean readInTx) + throws Exception + { + info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient + + ", crdChangeCnt=" + crdChangeCnt + + ", readInTx=" + readInTx + ']'); testSpi = true; @@ -2072,7 +3001,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { IgniteInternalFuture getFut = GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { - Map<Integer, Integer> res = cache.getAll(keys); + Map<Integer, Integer> res; + + if (readInTx) { + try (Transaction tx = getNode.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + res = cache.getAll(keys); + + tx.rollback(); + } + } + else + res = cache.getAll(keys); assertEquals(20, res.size()); @@ -2197,21 +3136,36 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testReadInProgressCoordinatorFails() throws Exception { - readInProgressCoordinatorFails(false); + readInProgressCoordinatorFails(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testReadInsideTxInProgressCoordinatorFails() throws Exception { + readInProgressCoordinatorFails(false, true); } /** * @throws Exception If failed. */ public void testReadInProgressCoordinatorFails_ReadDelay() throws Exception { - readInProgressCoordinatorFails(true); + readInProgressCoordinatorFails(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testReadInsideTxInProgressCoordinatorFails_ReadDelay() throws Exception { + readInProgressCoordinatorFails(true, true); } /** * @param readDelay {@code True} if delays get requests. + * @param readInTx {@code True} to read inside transaction. * @throws Exception If failed. */ - private void readInProgressCoordinatorFails(boolean readDelay) throws Exception { + private void readInProgressCoordinatorFails(boolean readDelay, final boolean readInTx) throws Exception { final int COORD_NODES = 5; final int SRV_NODES = 4; @@ -2284,7 +3238,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (String cacheName : cacheNames) { IgniteCache cache = node.cache(cacheName); - Map<Integer, Integer> res = cache.getAll(vals.keySet()); + Map<Integer, Integer> res; + + if (readInTx) { + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + res = cache.getAll(vals.keySet()); + + tx.rollback(); + } + } + else + res = cache.getAll(vals.keySet()); assertEquals(vals.size(), res.size()); @@ -2311,7 +3275,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { throw e; } } - }, (SRV_NODES + 1) + 1, "get-thread"); + }, ((SRV_NODES + 1) + 1) * 2, "get-thread"); IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new Callable() { @Override public Void call() throws Exception { @@ -2468,7 +3432,19 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < 10; i++) vals.put(i, val); - try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + TransactionConcurrency concurrency; + TransactionIsolation isolation; + + if (ThreadLocalRandom.current().nextBoolean()) { + concurrency = PESSIMISTIC; + isolation = REPEATABLE_READ; + } + else { + concurrency = OPTIMISTIC; + isolation = SERIALIZABLE; + } + + try (Transaction tx = putNode.transactions().txStart(concurrency, isolation)) { for (String cacheName : cacheNames) putNode.cache(cacheName).putAll(vals); @@ -2525,7 +3501,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { MvccCoordinator crd = null; for (Ignite node : G.allGrids()) { - CacheCoordinatorsProcessor crdProc = ((IgniteKernal) node).context().cache().context().coordinators(); + CacheCoordinatorsProcessor crdProc = ((IgniteKernal)node).context().cache().context().coordinators(); MvccCoordinator crd0 = crdProc.currentCoordinator(); @@ -2704,16 +3680,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int cnt = 0; while (!stop.get()) { - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); TreeSet<Integer> keys = new TreeSet<>(); @@ -2721,7 +3697,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { keys.add(rnd.nextInt(TOTAL)); try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Map<Integer, Integer> curVals = cache.getAll(keys); + Map<Integer, Integer> curVals = cache.cache.getAll(keys); assertEquals(N, curVals.size()); @@ -2730,10 +3706,13 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (Map.Entry<Integer, Integer> e : curVals.entrySet()) newVals.put(e.getKey(), e.getValue() + 1); - cache.putAll(newVals); + cache.cache.putAll(newVals); tx.commit(); } + finally { + cache.readUnlock(); + } cnt++; } @@ -2742,9 +3721,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); Set<Integer> keys = new LinkedHashSet<>(); @@ -2753,9 +3732,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (keys.size() < TOTAL) keys.add(rnd.nextInt(TOTAL)); - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); - Map<Integer, Integer> vals = cache.getAll(keys); + Map<Integer, Integer> vals; + + try { + vals = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); + } assertEquals(TOTAL, vals.size()); @@ -2773,9 +3759,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } if (idx == 0) { - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); - Map<Integer, Integer> vals = cache.getAll(keys); + Map<Integer, Integer> vals; + + try { + vals = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); +
<TRUNCATED>
