Repository: ignite Updated Branches: refs/heads/ignite-5932 b6b790aa2 -> 8745522b7
ignite-5932 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8745522b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8745522b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8745522b Branch: refs/heads/ignite-5932 Commit: 8745522b73f9916a9adf57f14006caee69071017 Parents: b6b790a Author: sboikov <sboi...@gridgain.com> Authored: Mon Oct 16 13:05:08 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Oct 16 14:09:11 2017 +0300 ---------------------------------------------------------------------- ...arOptimisticSerializableTxPrepareFuture.java | 32 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 738 +++++++++++++------ 2 files changed, 545 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8745522b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 6c3bd13..274bf21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -191,6 +191,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim tx.removeMapping(m.primary().id()); } + prepareError(e); + } + + /** + * @param e Error. + */ + private void prepareError(Throwable e) { ERR_UPD.compareAndSet(this, null, e); if (keyLockFut != null) @@ -733,21 +740,20 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).primary().id() + - ", loc=" + ((MiniFuture)f).primary().isLocal() + - ", done=" + f.isDone() + "]"; - } - }, - new P1<IgniteInternalFuture<?>>() { - @Override public boolean apply(IgniteInternalFuture<?> f) { - return isMini(f); + if (isMini(f)) { + return "[node=" + ((MiniFuture)f).primary().id() + + ", loc=" + ((MiniFuture)f).primary().isLocal() + + ", done=" + f.isDone() + + ", err=" + f.error() + "]"; + } + else + return f.toString(); } }); return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this, "innerFuts", futs, - "keyLockFut", keyLockFut, - "mvccVerFut", mvccVerFut, + "remap", remapFut != null, "tx", tx, "super", super.toString()); } @@ -955,7 +961,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim remap(res); } catch (IgniteCheckedException e) { - ERR_UPD.compareAndSet(parent, null, e); + parent.prepareError(e); onDone(e); } @@ -968,7 +974,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim err0.retryReadyFuture(affFut); - ERR_UPD.compareAndSet(parent, null, err0); + parent.prepareError(err0); onDone(err0); } @@ -979,7 +985,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim parent); } - ERR_UPD.compareAndSet(parent, null, e); + parent.prepareError(e); onDone(e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8745522b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index f44be3f..7faa171 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -27,6 +27,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -35,6 +36,8 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -89,6 +92,7 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionOptimisticException; import org.jetbrains.annotations.Nullable; +import org.junit.Assert; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -1351,46 +1355,53 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode() throws Exception { - putAllGetAll(false, 1, 0, 0, 64); + putAllGetAll(null, 1, 0, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode_SinglePartition() throws Exception { - putAllGetAll(false, 1, 0, 0, 1); + putAllGetAll(null, 1, 0, 0, 1); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups0() throws Exception { - putAllGetAll(false, 4, 2, 0, 64); + putAllGetAll(null, 4, 2, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups1() throws Exception { - putAllGetAll(false, 4, 2, 1, 64); + putAllGetAll(null, 4, 2, 1, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups2() throws Exception { - putAllGetAll(false, 4, 2, 2, 64); + putAllGetAll(null, 4, 2, 2, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator() throws Exception { - putAllGetAll(true, 4, 2, 1, 64); + putAllGetAll(RestartMode.RESTART_CRD, 4, 2, 1, 64); } /** - * @param restartCrd Coordinator restart flag. + * @throws Exception If failed. + */ + public void testPutAllGetAll_ClientServer_Backups1_Restart() throws Exception { + putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 2, 1, 64); + } + + /** + * @param restartMode Restart mode. * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -1398,7 +1409,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void putAllGetAll( - boolean restartCrd, + RestartMode restartMode, final int srvs, final int clients, int cacheBackups, @@ -1411,9 +1422,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final int readers = 4; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int min = idx * RANGE; @@ -1431,30 +1442,35 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (map.size() < RANGE) map.put(rnd.nextInt(min, max), v); - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - if (updated && rnd.nextBoolean()) { - Map<Integer, Integer> res = cache.getAll(map.keySet()); + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (updated && rnd.nextBoolean()) { + Map<Integer, Integer> res = cache.cache.getAll(map.keySet()); - for (Integer k : map.keySet()) - assertEquals(v - 1, (Object)res.get(k)); - } + for (Integer k : map.keySet()) + assertEquals(v - 1, (Object)res.get(k)); + } - cache.putAll(map); + cache.cache.putAll(map); - tx.commit(); + tx.commit(); - updated = true; - } + updated = true; + } - if (rnd.nextBoolean()) { - Map<Integer, Integer> res = cache.getAll(map.keySet()); + if (rnd.nextBoolean()) { + Map<Integer, Integer> res = cache.cache.getAll(map.keySet()); - for (Integer k : map.keySet()) - assertEquals(v, (Object)res.get(k)); + for (Integer k : map.keySet()) + assertEquals(v, (Object)res.get(k)); + } + } + finally { + cache.readUnlock(); } map.clear(); @@ -1466,9 +1482,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); Set<Integer> keys = new LinkedHashSet<>(); @@ -1484,9 +1500,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (keys.size() < RANGE) keys.add(rnd.nextInt(min, max)); - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + + Map<Integer, Integer> map; - Map<Integer, Integer> map = cache.getAll(keys); + try { + map = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); + } assertTrue("Invalid map size: " + map.size(), map.isEmpty() || map.size() == RANGE); @@ -1523,7 +1546,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { }; readWriteTest( - restartCrd, + restartMode, srvs, clients, cacheBackups, @@ -1634,109 +1657,115 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final Set<Integer> rmvdIds = new HashSet<>(); - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int cnt = 0; while (!stop.get()) { - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - cnt++; + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - Integer id1 = rnd.nextInt(ACCOUNTS); - Integer id2 = rnd.nextInt(ACCOUNTS); + cnt++; - while (id1.equals(id2)) - id2 = rnd.nextInt(ACCOUNTS); + Integer id1 = rnd.nextInt(ACCOUNTS); + Integer id2 = rnd.nextInt(ACCOUNTS); - TreeSet<Integer> keys = new TreeSet<>(); + while (id1.equals(id2)) + id2 = rnd.nextInt(ACCOUNTS); - keys.add(id1); - keys.add(id2); + TreeSet<Integer> keys = new TreeSet<>(); - Integer cntr1 = null; - Integer cntr2 = null; + keys.add(id1); + keys.add(id2); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - MvccTestAccount a1; - MvccTestAccount a2; + Integer cntr1 = null; + Integer cntr2 = null; - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + MvccTestAccount a1; + MvccTestAccount a2; - a1 = accounts.get(id1); - a2 = accounts.get(id2); + Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys); - if (!withRmvs) { - assertNotNull(a1); - assertNotNull(a2); + a1 = accounts.get(id1); + a2 = accounts.get(id2); - cntr1 = a1.updateCnt + 1; - cntr2 = a2.updateCnt + 1; + if (!withRmvs) { + assertNotNull(a1); + assertNotNull(a2); - cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1)); - cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2)); - } - else { - if (a1 != null || a2 != null) { - if (a1 != null && a2 != null) { - Integer rmvd = null; + cntr1 = a1.updateCnt + 1; + cntr2 = a2.updateCnt + 1; + + cache.cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2)); + } + else { + if (a1 != null || a2 != null) { + if (a1 != null && a2 != null) { + Integer rmvd = null; - if (rnd.nextInt(10) == 0) { - synchronized (rmvdIds) { - if (rmvdIds.size() < ACCOUNTS / 2) { - rmvd = rnd.nextBoolean() ? id1 : id2; + if (rnd.nextInt(10) == 0) { + synchronized (rmvdIds) { + if (rmvdIds.size() < ACCOUNTS / 2) { + rmvd = rnd.nextBoolean() ? id1 : id2; - assertTrue(rmvdIds.add(rmvd)); + assertTrue(rmvdIds.add(rmvd)); + } } } - } - if (rmvd != null) { - cache.remove(rmvd); + if (rmvd != null) { + cache.cache.remove(rmvd); - cache.put(rmvd.equals(id1) ? id2 : id1, - new MvccTestAccount(a1.val + a2.val, 1)); + cache.cache.put(rmvd.equals(id1) ? id2 : id1, + new MvccTestAccount(a1.val + a2.val, 1)); + } + else { + cache.cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); + } } else { - cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); - cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); - } - } - else { - if (a1 == null) { - cache.put(id1, new MvccTestAccount(100, 1)); - cache.put(id2, new MvccTestAccount(a2.val - 100, 1)); + if (a1 == null) { + cache.cache.put(id1, new MvccTestAccount(100, 1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 100, 1)); - assertTrue(rmvdIds.remove(id1)); - } - else { - cache.put(id1, new MvccTestAccount(a1.val - 100, 1)); - cache.put(id2, new MvccTestAccount(100, 1)); + assertTrue(rmvdIds.remove(id1)); + } + else { + cache.cache.put(id1, new MvccTestAccount(a1.val - 100, 1)); + cache.cache.put(id2, new MvccTestAccount(100, 1)); - assertTrue(rmvdIds.remove(id2)); + assertTrue(rmvdIds.remove(id2)); + } } } } - } - tx.commit(); - } + tx.commit(); + } - if (!withRmvs) { - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + if (!withRmvs) { + Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys); - MvccTestAccount a1 = accounts.get(id1); - MvccTestAccount a2 = accounts.get(id2); + MvccTestAccount a1 = accounts.get(id1); + MvccTestAccount a2 = accounts.get(id2); - assertNotNull(a1); - assertNotNull(a2); + assertNotNull(a1); + assertNotNull(a2); - assertTrue(a1.updateCnt >= cntr1); - assertTrue(a2.updateCnt >= cntr2); + assertTrue(a1.updateCnt >= cntr1); + assertTrue(a2.updateCnt >= cntr2); + } + } + finally { + cache.readUnlock(); } } @@ -1744,9 +1773,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); Set<Integer> keys = new LinkedHashSet<>(); @@ -1757,21 +1786,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (keys.size() < ACCOUNTS) keys.add(rnd.nextInt(ACCOUNTS)); - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); Map<Integer, MvccTestAccount> accounts; - if (readMode == ReadMode.SCAN) { - accounts = new HashMap<>(); + try { + if (readMode == ReadMode.SCAN) { + accounts = new HashMap<>(); - for (IgniteCache.Entry<Integer, MvccTestAccount> e : cache) { - MvccTestAccount old = accounts.put(e.getKey(), e.getValue()); + for (IgniteCache.Entry<Integer, MvccTestAccount> e : cache.cache) { + MvccTestAccount old = accounts.put(e.getKey(), e.getValue()); - assertNull(old); + assertNull(old); + } } + else + accounts = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); } - else - accounts = cache.getAll(keys); if (!withRmvs) assertEquals(ACCOUNTS, accounts.size()); @@ -1799,9 +1833,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } if (idx == 0) { - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + + Map<Integer, MvccTestAccount> accounts; - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + try { + accounts = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); + } int sum = 0; @@ -1822,7 +1863,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { }; readWriteTest( - false, + null, srvs, clients, cacheBackups, @@ -1922,46 +1963,52 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int cnt = 0; while (!stop.get()) { - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - cnt++; + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - Integer id1 = rnd.nextInt(ACCOUNTS); - Integer id2 = rnd.nextInt(ACCOUNTS); + cnt++; - while (id1.equals(id2)) - id2 = rnd.nextInt(ACCOUNTS); + Integer id1 = rnd.nextInt(ACCOUNTS); + Integer id2 = rnd.nextInt(ACCOUNTS); - TreeSet<Integer> keys = new TreeSet<>(); + while (id1.equals(id2)) + id2 = rnd.nextInt(ACCOUNTS); - keys.add(id1); - keys.add(id2); + TreeSet<Integer> keys = new TreeSet<>(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - MvccTestAccount a1; - MvccTestAccount a2; + keys.add(id1); + keys.add(id2); - Map<Integer, MvccTestAccount> accounts = cache.getAll(keys); + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + MvccTestAccount a1; + MvccTestAccount a2; - a1 = accounts.get(id1); - a2 = accounts.get(id2); + Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys); - assertNotNull(a1); - assertNotNull(a2); + a1 = accounts.get(id1); + a2 = accounts.get(id2); - cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); - cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); + assertNotNull(a1); + assertNotNull(a2); - tx.commit(); + cache.cache.put(id1, new MvccTestAccount(a1.val + 1, 1)); + cache.cache.put(id2, new MvccTestAccount(a2.val - 1, 1)); + + tx.commit(); + } + } + finally { + cache.readUnlock(); } } @@ -1969,16 +2016,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int cnt = 0; while (!stop.get()) { - IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); Map<Integer, MvccTestAccount> accounts = new HashMap<>(); @@ -1994,7 +2041,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < readCnt; i++) readKeys.add(accounts.size() + i); - Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys); + Map<Integer, MvccTestAccount> readRes = cache.cache.getAll(readKeys); assertEquals(readCnt, readRes.size()); @@ -2010,6 +2057,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { cnt++; } + finally { + cache.readUnlock(); + } } else { try (Transaction tx = txs.txStart(concurrency, isolation)) { @@ -2022,7 +2072,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < readCnt; i++) { Integer key = rnd.nextInt(ACCOUNTS); - MvccTestAccount account = cache.get(key); + MvccTestAccount account = cache.cache.get(key); assertNotNull(account); @@ -2035,7 +2085,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < readCnt; i++) readKeys.add(rnd.nextInt(ACCOUNTS)); - Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys); + Map<Integer, MvccTestAccount> readRes = cache.cache.getAll(readKeys); assertEquals(readKeys.size(), readRes.size()); @@ -2055,6 +2105,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { catch (TransactionOptimisticException ignore) { // No-op. } + finally { + cache.readUnlock(); + } } } @@ -2080,7 +2133,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { }; readWriteTest( - false, + null, srvs, clients, cacheBackups, @@ -2158,68 +2211,79 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final AtomicInteger keyCntr = new AtomicInteger(); - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int cnt = 0; while (!stop.get()) { - IgniteCache<Integer, Value> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + TestCache<Integer, Value> cache = randomCache(caches, rnd); - Integer key = keyCntr.incrementAndGet(); + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(key, new Value(idx, cnt++)); + Integer key = keyCntr.incrementAndGet(); - tx.commit(); - } + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.cache.put(key, new Value(idx, cnt++)); - if (key > 1_000_000) - break; + tx.commit(); + } + + if (key > 1_000_000) + break; + } + finally { + cache.readUnlock(); + } } info("Writer finished, updates: " + cnt); } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); while (!stop.get()) { - IgniteCache<Integer, Value> cache = randomCache(caches, rnd); + TestCache<Integer, Value> cache = randomCache(caches, rnd); - Map<Integer, TreeSet<Integer>> vals = new HashMap<>(); + try { + Map<Integer, TreeSet<Integer>> vals = new HashMap<>(); - for (IgniteCache.Entry<Integer, Value> e : cache) { - Value val = e.getValue(); + for (IgniteCache.Entry<Integer, Value> e : cache.cache) { + Value val = e.getValue(); - assertNotNull(val); + assertNotNull(val); - TreeSet<Integer> cntrs = vals.get(val.key); + TreeSet<Integer> cntrs = vals.get(val.key); - if (cntrs == null) - vals.put(val.key, cntrs = new TreeSet<>()); + if (cntrs == null) + vals.put(val.key, cntrs = new TreeSet<>()); - boolean add = cntrs.add(val.cnt); + boolean add = cntrs.add(val.cnt); - assertTrue(add); - } + assertTrue(add); + } - for (TreeSet<Integer> readCntrs : vals.values()) { - for (int i = 0; i < readCntrs.size(); i++) - assertTrue(readCntrs.contains(i)); + for (TreeSet<Integer> readCntrs : vals.values()) { + for (int i = 0; i < readCntrs.size(); i++) + assertTrue(readCntrs.contains(i)); + } + } + finally { + cache.readUnlock(); } } } }; readWriteTest( - false, + null, srvs, clients, cacheBackups, @@ -2233,6 +2297,147 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * TODO IGNITE-3478 enable when recovery is implemented. + * + * @throws Exception If failed. + */ + public void _testNodesRestartNoHang() throws Exception { + final int srvs = 4; + final int clients = 4; + final int writers = 6; + final int readers = 2; + + final int KEYS = 100_000; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Map<Integer, Integer> map = new TreeMap<>(); + + int cnt = 0; + + while (!stop.get()) { + int keys = rnd.nextInt(32) + 1; + + while (map.size() < keys) + map.put(rnd.nextInt(KEYS), cnt); + + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + + try { + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); + + TransactionConcurrency concurrency; + TransactionIsolation isolation; + + switch (rnd.nextInt(3)) { + case 0: { + concurrency = PESSIMISTIC; + isolation = REPEATABLE_READ; + + break; + } + case 1: { + concurrency = OPTIMISTIC; + isolation = REPEATABLE_READ; + + break; + } + case 2: { + concurrency = OPTIMISTIC; + isolation = SERIALIZABLE; + + break; + } + default: { + fail(); + + return; + } + } + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + if (rnd.nextBoolean()) { + Map<Integer, Integer> res = cache.cache.getAll(map.keySet()); + + assertNotNull(res); + } + + cache.cache.putAll(map); + + tx.commit(); + } + catch (TransactionOptimisticException e) { + assertEquals(SERIALIZABLE, isolation); + } + catch (Exception e) { + Assert.assertTrue("Unexpected error: " + e, X.hasCause(e, ClusterTopologyException.class)); + } + } + finally { + cache.readUnlock(); + } + + map.clear(); + + cnt++; + } + + info("Writer done, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Set<Integer> keys = new LinkedHashSet<>(); + + while (!stop.get()) { + int keyCnt = rnd.nextInt(64) + 1; + + while (keys.size() < keyCnt) + keys.add(rnd.nextInt(KEYS)); + + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + + Map<Integer, Integer> map; + + try { + map = cache.cache.getAll(keys); + + assertNotNull(map); + } + finally { + cache.readUnlock(); + } + + keys.clear(); + } + } + }; + + readWriteTest( + RestartMode.RESTART_RND_SRV, + srvs, + clients, + 1, + 256, + writers, + readers, + DFLT_TEST_TIME, + null, + writer, + reader); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); + } + + /** * @throws Exception If failed. */ public void testActiveQueryCleanupOnNodeFailure() throws Exception { @@ -3475,16 +3680,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); int cnt = 0; while (!stop.get()) { - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); - IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); TreeSet<Integer> keys = new TreeSet<>(); @@ -3492,7 +3697,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { keys.add(rnd.nextInt(TOTAL)); try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Map<Integer, Integer> curVals = cache.getAll(keys); + Map<Integer, Integer> curVals = cache.cache.getAll(keys); assertEquals(N, curVals.size()); @@ -3501,10 +3706,13 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (Map.Entry<Integer, Integer> e : curVals.entrySet()) newVals.put(e.getKey(), e.getValue() + 1); - cache.putAll(newVals); + cache.cache.putAll(newVals); tx.commit(); } + finally { + cache.readUnlock(); + } cnt++; } @@ -3513,9 +3721,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader = - new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) { + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); Set<Integer> keys = new LinkedHashSet<>(); @@ -3524,9 +3732,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (keys.size() < TOTAL) keys.add(rnd.nextInt(TOTAL)); - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); - Map<Integer, Integer> vals = cache.getAll(keys); + Map<Integer, Integer> vals; + + try { + vals = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); + } assertEquals(TOTAL, vals.size()); @@ -3544,9 +3759,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } if (idx == 0) { - IgniteCache<Integer, Integer> cache = randomCache(caches, rnd); + TestCache<Integer, Integer> cache = randomCache(caches, rnd); - Map<Integer, Integer> vals = cache.getAll(keys); + Map<Integer, Integer> vals; + + try { + vals = cache.cache.getAll(keys); + } + finally { + cache.readUnlock(); + } int sum = 0; @@ -3564,7 +3786,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { }; readWriteTest( - false, + null, srvs, clients, cacheBackups, @@ -3578,7 +3800,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** - * @param restartCrd If {@code true} dedicated coordinator node is restarted during test. + * @param restartMode Restart mode. * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -3592,7 +3814,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void readWriteTest( - final boolean restartCrd, + final RestartMode restartMode, final int srvs, final int clients, int cacheBackups, @@ -3601,9 +3823,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { final int readers, final long time, IgniteInClosure<IgniteCache<Object, Object>> init, - final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer, - final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader) throws Exception { - if (restartCrd) + final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer, + final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader) throws Exception { + if (restartMode == RestartMode.RESTART_CRD) CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure()); Ignite srv0 = startGridsMultiThreaded(srvs); @@ -3621,14 +3843,14 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { cacheBackups, cacheParts); - if (restartCrd) + if (restartMode == RestartMode.RESTART_CRD) ccfg.setNodeFilter(new CoordinatorNodeFilter()); IgniteCache<Object, Object> cache = srv0.createCache(ccfg); int crdIdx = srvs + clients; - if (restartCrd) { + if (restartMode == RestartMode.RESTART_CRD) { nodeAttr = CRD_ATTR; startGrid(crdIdx); @@ -3637,12 +3859,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { if (init != null) init.apply(cache); - final List<IgniteCache> caches = new ArrayList<>(srvs + clients); + final List<TestCache> caches = new ArrayList<>(srvs + clients); for (int i = 0; i < srvs + clients; i++) { Ignite node = grid(i); - caches.add(node.cache(cache.getName())); + caches.add(new TestCache(node.cache(cache.getName()))); } final long stopTime = U.currentTimeMillis() + time; @@ -3660,7 +3882,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { writer.apply(idx, caches, stop); } catch (Throwable e) { - if (restartCrd && X.hasCause(e, ClusterTopologyException.class)) { + if (restartMode != null && X.hasCause(e, ClusterTopologyException.class)) { log.info("Writer error: " + e); return null; @@ -3701,18 +3923,53 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { while (System.currentTimeMillis() < stopTime && !stop.get()) { Thread.sleep(1000); - if (restartCrd) { - log.info("Start new coordinator: " + (crdIdx + 1)); + if (restartMode != null) { + switch (restartMode) { + case RESTART_CRD: { + log.info("Start new coordinator: " + (crdIdx + 1)); + + startGrid(crdIdx + 1); + + log.info("Stop current coordinator: " + crdIdx); + + stopGrid(crdIdx); + + crdIdx++; + + awaitPartitionMapExchange(); + + break; + } + + case RESTART_RND_SRV: { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int idx = rnd.nextInt(srvs); + + TestCache cache0 = caches.get(idx); + + cache0.stopLock.writeLock().lock(); - startGrid(crdIdx + 1); + log.info("Stop node: " + idx); - log.info("Stop current coordinator: " + crdIdx); + stopGrid(idx); - stopGrid(crdIdx); + log.info("Start new node: " + idx); - crdIdx++; + Ignite srv = startGrid(idx); - awaitPartitionMapExchange(); + synchronized (caches) { + caches.set(idx, new TestCache(srv.cache(DEFAULT_CACHE_NAME))); + } + + awaitPartitionMapExchange(); + + break; + } + + default: + fail(); + } } } @@ -4067,8 +4324,20 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @param rnd Random. * @return Random cache. */ - private static <K, V> IgniteCache<K, V> randomCache(List<IgniteCache> caches, ThreadLocalRandom rnd) { - return caches.size() > 1 ? caches.get(rnd.nextInt(caches.size())): caches.get(0); + private static <K, V> TestCache<K, V> randomCache(List<TestCache> caches, ThreadLocalRandom rnd) { + synchronized (caches) { + if (caches.size() == 1) + return caches.get(0); + + for (;;) { + int idx = rnd.nextInt(caches.size()); + + TestCache testCache = caches.get(idx); + + if (testCache.readLock()) + return testCache; + } + } } /** @@ -4137,6 +4406,19 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * */ + enum RestartMode { + /** + * Dedicated coordinator node is restarted during test. + */ + RESTART_CRD, + + /** */ + RESTART_RND_SRV + } + + /** + * + */ static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> { /** {@inheritDoc} */ @Override public boolean apply(ClusterNode node) { @@ -4211,4 +4493,36 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']'; } } + + /** + * + */ + static class TestCache<K, V> { + /** */ + private final IgniteCache<K, V> cache; + + /** Locks node to avoid node restart while test operation is in progress. */ + private final ReadWriteLock stopLock = new ReentrantReadWriteLock(); + + /** + * @param cache Cache. + */ + TestCache(IgniteCache cache) { + this.cache = cache; + } + + /** + * @return {@code True} if locked. + */ + boolean readLock() { + return stopLock.readLock().tryLock(); + } + + /** + * + */ + void readUnlock() { + stopLock.readLock().unlock(); + } + } }