Repository: ignite Updated Branches: refs/heads/ignite-1607 1c9feb179 -> 59bf1a2e6
http://git-wip-us.apache.org/repos/asf/ignite/blob/59bf1a2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 377e9a8..c86434c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -34,6 +34,7 @@ import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.integration.CacheLoaderException; import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -296,8 +297,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { txAsync(cache, PESSIMISTIC, REPEATABLE_READ, new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { - @Override - public Void apply(IgniteCache<Integer, Integer> cache) { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { cache.get(key); return null; @@ -911,6 +911,100 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed + */ + public void testTxConflictInvokeAll() throws Exception { + Ignite ignite0 = ignite(0); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg); + + final Integer key1 = primaryKey(ignite(0).cache(cache0.getName())); + final Integer key2 = primaryKey(ignite(1).cache(cache0.getName())); + + Map<Integer, Integer> vals = new HashMap<>(); + + int newVal = 0; + + for (Ignite ignite : G.allGrids()) { + log.info("Test node: " + ignite.name()); + + IgniteTransactions txs = ignite.transactions(); + + IgniteCache<Integer, Integer> cache = ignite.cache(cache0.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map<Integer, EntryProcessorResult<Integer>> res = + cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal)); + + if (!vals.isEmpty()) { + EntryProcessorResult<Integer> res1 = res.get(key1); + + assertNotNull(res1); + assertEquals(vals.get(key1), res1.get()); + + EntryProcessorResult<Integer> res2 = res.get(key2); + + assertNotNull(res2); + assertEquals(vals.get(key2), res2.get()); + } + else + assertTrue(res.isEmpty()); + + tx.commit(); + } + + checkValue(key1, newVal, cache.getName()); + checkValue(key2, newVal, cache.getName()); + + vals.put(key1, newVal); + vals.put(key2, newVal); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map<Integer, EntryProcessorResult<Integer>> res = + cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal + 1)); + + EntryProcessorResult<Integer> res1 = res.get(key1); + + assertNotNull(res1); + assertEquals(vals.get(key1), res1.get()); + + EntryProcessorResult<Integer> res2 = res.get(key2); + + assertNotNull(res2); + assertEquals(vals.get(key2), res2.get()); + + updateKey(cache0, key1, -1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key1, -1, cache.getName()); + checkValue(key2, newVal, cache.getName()); + + vals.put(key1, -1); + vals.put(key2, newVal); + + newVal++; + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** * @throws Exception If failed. */ public void testTxConflictPutIfAbsent() throws Exception { @@ -999,6 +1093,92 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxConflictGetAndPutIfAbsent() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 2); + + assertNull(old); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 2); + + assertEquals(1, old); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 2); + + assertNull(old); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndPutIfAbsent(key, 4); + + assertEquals(2, old); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ public void testTxConflictReplace() throws Exception { Ignite ignite0 = ignite(0); @@ -1249,52 +1429,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testTxNoConflictPut1() throws Exception { - txNoConflictUpdate(true, false, false); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoConflictPut2() throws Exception { - txNoConflictUpdate(false, false, false); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoConflictPut3() throws Exception { - txNoConflictUpdate(false, false, true); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoConflictRemove1() throws Exception { - txNoConflictUpdate(true, true, false); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoConflictRemove2() throws Exception { - txNoConflictUpdate(false, true, false); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoConflictRemove3() throws Exception { - txNoConflictUpdate(false, true, true); - } - - /** - * @throws Exception If failed. - * @param noVal If {@code true} there is no cache value when do update in tx. - * @param rmv If {@code true} tests remove, otherwise put. - * @param getAfterUpdate If {@code true} tries to get value in tx after update. - */ - private void txNoConflictUpdate(boolean noVal, boolean rmv, boolean getAfterUpdate) throws Exception { + public void testTxConflictRemoveWithOldValue() throws Exception { Ignite ignite0 = ignite(0); final IgniteTransactions txs = ignite0.transactions(); @@ -1307,84 +1442,510 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { List<Integer> keys = testKeys(cache); - for (Integer key : keys) { + for (final Integer key : keys) { log.info("Test key: " + key); - if (!noVal) - cache.put(key, -1); + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - if (rmv) - cache.remove(key); - else - cache.put(key, 2); + assertFalse(rmv); - if (getAfterUpdate) { - Object val = cache.get(key); + updateKey(cache, key, 1); - if (rmv) - assertNull(val); - else - assertEquals(2, val); + tx.commit(); } - updateKey(cache, key, 1); - - tx.commit(); + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); } - checkValue(key, rmv ? null : 2, cache.getName()); + checkValue(key, 1, cache.getName()); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key, 3); + boolean rmv = cache.remove(key, 1); + + assertTrue(rmv); tx.commit(); } - checkValue(key, 3, cache.getName()); - } - - Map<Integer, Integer> map = new HashMap<>(); + checkValue(key, null, cache.getName()); - for (int i = 0; i < 100; i++) - map.put(i, i); + cache.remove(key); - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - if (rmv) - cache.removeAll(map.keySet()); - else - cache.putAll(map); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); - if (getAfterUpdate) { - Map<Integer, Integer> res = cache.getAll(map.keySet()); + assertFalse(rmv); - if (rmv) { - for (Integer key : map.keySet()) - assertNull(res.get(key)); - } - else { - for (Integer key : map.keySet()) - assertEquals(map.get(key), res.get(key)); - } + tx.commit(); } - txAsync(cache, PESSIMISTIC, REPEATABLE_READ, - new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { - @Override public Void apply(IgniteCache<Integer, Integer> cache) { - Map<Integer, Integer> map = new HashMap<>(); + checkValue(key, null, cache.getName()); - for (int i = 0; i < 100; i++) - map.put(i, -1); + cache.put(key, 2); - cache.putAll(map); + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); - return null; - } - } - ); + assertTrue(rmv); - tx.commit(); - } + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 3); + + assertTrue(rmv); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 2); + + assertFalse(rmv); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean rmv = cache.remove(key, 1); + + assertTrue(rmv); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictCasReplace() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 2); + + assertFalse(replace); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 2); + + assertTrue(replace); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 2); + + assertFalse(replace); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 2); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2, 1); + + assertTrue(replace); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 3, 4); + + assertTrue(replace); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { + cache.remove(key); + + return null; + } + } + ); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2, 3); + + assertFalse(replace); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 1, 3); + + assertTrue(replace); + + tx.commit(); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRemoveReturnBoolean1() throws Exception { + txConflictRemoveReturnBoolean(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRemoveReturnBoolean2() throws Exception { + txConflictRemoveReturnBoolean(true); + } + + /** + * @param noVal If {@code true} there is no cache value when do update in tx. + * @throws Exception If failed. + */ + private void txConflictRemoveReturnBoolean(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + if (!noVal) + cache.put(key, -1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertEquals(!noVal, res); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertTrue(res); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + // Check no conflict for removeAll with single key. + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.removeAll(Collections.singleton(key)); + + updateKey(cache, key, 1); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertTrue(res); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.remove(key); + + assertFalse(res); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictPut1() throws Exception { + txNoConflictUpdate(true, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictPut2() throws Exception { + txNoConflictUpdate(false, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictPut3() throws Exception { + txNoConflictUpdate(false, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove1() throws Exception { + txNoConflictUpdate(true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove2() throws Exception { + txNoConflictUpdate(false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove3() throws Exception { + txNoConflictUpdate(false, true, true); + } + + /** + * @throws Exception If failed. + * @param noVal If {@code true} there is no cache value when do update in tx. + * @param rmv If {@code true} tests remove, otherwise put. + * @param getAfterUpdate If {@code true} tries to get value in tx after update. + */ + private void txNoConflictUpdate(boolean noVal, boolean rmv, boolean getAfterUpdate) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + if (!noVal) + cache.put(key, -1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (rmv) + cache.remove(key); + else + cache.put(key, 2); + + if (getAfterUpdate) { + Object val = cache.get(key); + + if (rmv) + assertNull(val); + else + assertEquals(2, val); + } + + if (!rmv) + updateKey(cache, key, 1); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 3); + + tx.commit(); + } + + checkValue(key, 3, cache.getName()); + } + + Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (rmv) + cache.removeAll(map.keySet()); + else + cache.putAll(map); + + if (getAfterUpdate) { + Map<Integer, Integer> res = cache.getAll(map.keySet()); + + if (rmv) { + for (Integer key : map.keySet()) + assertNull(res.get(key)); + } + else { + for (Integer key : map.keySet()) + assertEquals(map.get(key), res.get(key)); + } + } + + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { + Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, -1); + + cache.putAll(map); + + return null; + } + } + ); + + tx.commit(); + } for (int i = 0; i < 100; i++) checkValue(i, rmv ? null : i, cache.getName()); @@ -1398,6 +1959,96 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxNoConflictContainsKey1() throws Exception { + txNoConflictContainsKey(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictContainsKey2() throws Exception { + txNoConflictContainsKey(true); + } + + /** + * @param noVal If {@code true} there is no cache value when do update in tx. + * @throws Exception If failed. + */ + private void txNoConflictContainsKey(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + if (!noVal) + cache.put(key, -1); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertEquals(!noVal, res); + + updateKey(cache, key, 1); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertTrue(res); + + updateKey(cache, key, 2); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertTrue(res); + + tx.commit(); + } + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean res = cache.containsKey(key); + + assertFalse(res); + + updateKey(cache, key, 3); + + tx.commit(); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ public void testTxRollbackIfLocked1() throws Exception { Ignite ignite0 = ignite(0); @@ -1744,6 +2395,360 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCrossCacheTx() throws Exception { + Ignite ignite0 = ignite(0); + + final String CACHE1 = "cache1"; + final String CACHE2 = "cache2"; + + try { + CacheConfiguration<Integer, Integer> ccfg1 = + cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + + ccfg1.setName(CACHE1); + + ignite0.createCache(ccfg1); + + CacheConfiguration<Integer, Integer> ccfg2= + cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + + ccfg2.setName(CACHE2); + + ignite0.createCache(ccfg2); + + Integer newVal = 0; + + List<Integer> keys = testKeys(ignite0.<Integer, Integer>cache(CACHE1)); + + for (Ignite ignite : G.allGrids()) { + log.info("Test node: " + ignite.name()); + + IgniteCache<Integer, Integer> cache1 = ignite.cache(CACHE1); + IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2); + + IgniteTransactions txs = ignite.transactions(); + + for (Integer key : keys) { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key, newVal); + cache2.put(key, newVal); + + tx.commit(); + } + + checkValue(key, newVal, CACHE1); + checkValue(key, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key); + Object val2 = cache2.get(key); + + assertEquals(newVal, val1); + assertEquals(newVal, val2); + + tx.commit(); + } + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key, newVal + 1); + cache2.put(key, newVal + 1); + + tx.rollback(); + } + + checkValue(key, newVal, CACHE1); + checkValue(key, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key); + Object val2 = cache2.get(key); + + assertEquals(newVal, val1); + assertEquals(newVal, val2); + + cache1.put(key, newVal + 1); + cache2.put(key, newVal + 1); + + tx.commit(); + } + + newVal++; + + checkValue(key, newVal, CACHE1); + checkValue(key, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key, newVal); + cache2.put(-key, newVal); + + tx.commit(); + } + + checkValue(key, newVal, CACHE1); + checkValue(-key, null, CACHE1); + + checkValue(key, newVal, CACHE2); + checkValue(-key, newVal, CACHE2); + } + + newVal++; + + Integer key1 = primaryKey(ignite(0).cache(CACHE1)); + Integer key2 = primaryKey(ignite(1).cache(CACHE1)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal); + cache1.put(key2, newVal); + + cache2.put(key1, newVal); + cache2.put(key2, newVal); + + tx.commit(); + } + + checkValue(key1, newVal, CACHE1); + checkValue(key2, newVal, CACHE1); + checkValue(key1, newVal, CACHE2); + checkValue(key2, newVal, CACHE2); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = lockKey(latch, cache1, key1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal + 1); + cache2.put(key1, newVal + 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + checkValue(key1, 1, CACHE1); + checkValue(key1, newVal, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal + 1); + cache2.put(key1, newVal + 1); + + tx.commit(); + } + + newVal++; + + cache1.put(key2, newVal); + cache2.put(key2, newVal); + + checkValue(key1, newVal, CACHE1); + checkValue(key1, newVal, CACHE2); + + latch = new CountDownLatch(1); + + fut = lockKey(latch, cache1, key1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache1.put(key1, newVal + 1); + cache2.put(key2, newVal + 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + checkValue(key1, 1, CACHE1); + checkValue(key2, newVal, CACHE2); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key1); + Object val2 = cache2.get(key2); + + assertEquals(1, val1); + assertEquals(newVal, val2); + + updateKey(cache2, key2, 1); + + cache1.put(key1, newVal + 1); + cache2.put(key2, newVal + 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key1, 1, CACHE1); + checkValue(key2, 1, CACHE2); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key1); + Object val2 = cache2.get(key2); + + assertEquals(1, val1); + assertEquals(1, val2); + + cache1.put(key1, newVal + 1); + cache2.put(key2, newVal + 1); + + tx.commit(); + } + + newVal++; + + checkValue(key1, newVal, CACHE1); + checkValue(key2, newVal, CACHE2); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key1); + Object val2 = cache2.get(key2); + + assertEquals(newVal, val1); + assertEquals(newVal, val2); + + updateKey(cache2, key2, newVal); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key1); + Object val2 = cache2.get(key2); + + assertEquals(newVal, val1); + assertEquals(newVal, val2); + + tx.commit(); + } + } + } + finally { + ignite0.destroyCache(CACHE1); + ignite0.destroyCache(CACHE2); + } + } + + /** + * @throws Exception If failed. + */ + public void testRandomOperations() throws Exception { + Ignite ignite0 = ignite(0); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (Ignite ignite : G.allGrids()) { + log.info("Test node: " + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + IgniteTransactions txs = ignite.transactions(); + + final int KEYS = 100; + + for (int i = 0; i < 1000; i++) { + Integer key1 = rnd.nextInt(KEYS); + + Integer key2; + + if (rnd.nextBoolean()) { + key2 = rnd.nextInt(KEYS); + + while (key2.equals(key1)) + key2 = rnd.nextInt(KEYS); + } + else + key2 = key1 + 1; + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + randomOperation(rnd, cache, key1); + randomOperation(rnd, cache, key2); + + tx.commit(); + } + } + + for (int key = 0; key < KEYS; key++) { + Integer val = cache0.get(key); + + for (int node = 1; node < SRVS + CLIENTS; node++) + assertEquals(val, ignite(node).cache(cache.getName()).get(key)); + } + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @param rnd Random. + * @param cache Cache. + * @param key Key. + */ + private void randomOperation(ThreadLocalRandom rnd, IgniteCache<Integer, Integer> cache, Integer key) { + switch (rnd.nextInt(4)) { + case 0: + cache.put(key, rnd.nextInt()); + + break; + + case 1: + cache.remove(key); + + break; + + case 2: + cache.invoke(key, new SetValueProcessor(rnd.nextBoolean() ? 1 : null)); + + break; + + case 3: + cache.get(key); + + break; + + default: + assert false; + } + } + + /** + * @throws Exception If failed. + */ public void testAccountTx1() throws Exception { accountTx(false, false, TestMemoryMode.HEAP); } @@ -1958,28 +2963,35 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConcurrentUpdateNoDeadlock() throws Exception { - concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false); + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockGetPut() throws Exception { + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, true, false); } /** * @throws Exception If failed. */ public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception { - concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true); + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, false, true); } /** * @throws Exception If failed. */ public void testConcurrentUpdateNoDeadlockFromClients() throws Exception { - concurrentUpdateNoDeadlock(clients(), 20, false); + concurrentUpdateNoDeadlock(clients(), 20, false, false); } /** * @throws Exception If failed. */ public void testConcurrentUpdateNoDeadlockFromClientsNodeRestart() throws Exception { - concurrentUpdateNoDeadlock(clients(), 20, true); + concurrentUpdateNoDeadlock(clients(), 20, false, true); } /** @@ -2002,11 +3014,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @param updateNodes Nodes executing updates. * @param threads Number of threads executing updates. + * @param get If {@code true} gets value in transaction. * @param restart If {@code true} restarts one node. * @throws Exception If failed. */ private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes, int threads, + final boolean get, final boolean restart) throws Exception { if (FAST) return; @@ -2077,6 +3091,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { if (restart) { doInTransaction(node, OPTIMISTIC, SERIALIZABLE, new Callable<Void>() { @Override public Void call() throws Exception { + if (get) + cache.getAll(keys.keySet()); + cache.putAll(keys); return null; @@ -2085,6 +3102,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } else { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (get) + cache.getAll(keys.keySet()); + cache.putAll(keys); tx.commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/59bf1a2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java index 1495a2b..6c58145 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java @@ -52,6 +52,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * @@ -190,6 +191,11 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, false); txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, true); + + if (writeSync == FULL_SYNC) { + txOperations(OPTIMISTIC, SERIALIZABLE, crossCacheTx, false); + txOperations(OPTIMISTIC, SERIALIZABLE, crossCacheTx, true); + } } finally { ignite.destroyCache(CACHE1);
