Repository: ignite Updated Branches: refs/heads/ignite-1607-read 54bbc753d -> 6849ebe10
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index dfe82d4..a620ee5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -19,9 +19,12 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; @@ -30,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.integration.CacheLoaderException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteTransactions; @@ -42,17 +47,22 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionOptimisticException; +import org.jsr166.ConcurrentHashMap8; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -67,6 +77,12 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ + private static final boolean FAST = true; + + /** */ + private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>(); + + /** */ private static final int SRVS = 4; /** */ @@ -79,42 +95,793 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnly1() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.rollback(); + } + + checkValue(key, null, cache.getName()); + + cache.put(key, 1); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + tx.commit(); + } + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnly2() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override + public Void apply(IgniteCache<Integer, Integer> cache) { + cache.get(key); + + return null; + } + } + ); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override + public Void apply(IgniteCache<Integer, Integer> cache) { + cache.get(key); + + return null; + } + } + ); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxCommitReadOnlyGetAll() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < 100; i++) + keys.add(i); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map<Integer, Integer> map = cache.getAll(keys); + + assertTrue(map.isEmpty()); + + tx.commit(); + } + + for (Integer key : keys) + checkValue(key, null, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Map<Integer, Integer> map = cache.getAll(keys); + + assertTrue(map.isEmpty()); + + tx.rollback(); + } + + for (Integer key : keys) + checkValue(key, null, cache.getName()); + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRead1() throws Exception { + txConflictRead(true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictRead2() throws Exception { + txConflictRead(false); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @throws Exception If failed. + */ + private void txConflictRead(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = cache.get(key); + + assertEquals(1, val); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadWrite1() throws Exception { + txConflictReadWrite(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadWrite2() throws Exception { + txConflictReadWrite(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadRemove1() throws Exception { + txConflictReadWrite(true, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReadRemove2() throws Exception { + txConflictReadWrite(false, true); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @param rmv If {@code true} tests remove, otherwise put. + * @throws Exception If failed. + */ + private void txConflictReadWrite(boolean noVal, boolean rmv) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + if (rmv) + cache.remove(key); + else + cache.put(key, 2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(1, (Object) val); + + if (rmv) + cache.remove(key); + else + cache.put(key, 2); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndPut1() throws Exception { + txConflictGetAndPut(true, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndPut2() throws Exception { + txConflictGetAndPut(false, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndRemove1() throws Exception { + txConflictGetAndPut(true, true); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictGetAndRemove2() throws Exception { + txConflictGetAndPut(false, true); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @param rmv If {@code true} tests remove, otherwise put. + * @throws Exception If failed. + */ + private void txConflictGetAndPut(boolean noVal, boolean rmv) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2); + + assertEquals(1, val); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke1() throws Exception { + txConflictInvoke(true, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke2() throws Exception { + txConflictInvoke(false, false); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke3() throws Exception { + txConflictInvoke(true, true); + } + + /** + * @throws Exception If failed + */ + public void testTxConflictInvoke4() throws Exception { + txConflictInvoke(false, true); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @param rmv If {@code true} invoke does remove value, otherwise put. + * @throws Exception If failed. + */ + private void txConflictInvoke(boolean noVal, boolean rmv) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2)); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2)); + + assertEquals(1, val); + + tx.commit(); + } + + checkValue(key, rmv ? null : 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictPutIfAbsent() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertTrue(put); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertFalse(put); + + tx.commit(); + } + + checkValue(key, 1, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertTrue(put); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean put = cache.putIfAbsent(key, 2); + + assertFalse(put); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxConflictReplace() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (final Integer key : keys) { + log.info("Test key: " + key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertFalse(replace); + + updateKey(cache, key, 1); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 1, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertTrue(replace); + + tx.commit(); + } + + checkValue(key, 2, cache.getName()); + + cache.remove(key); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertFalse(replace); + + tx.commit(); + } + + checkValue(key, null, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertFalse(replace); + + updateKey(cache, key, 3); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key, 3, cache.getName()); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); + + assertTrue(replace); - cfg.setClientMode(client); + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { + cache.remove(key); - return cfg; - } + return null; + } + } + ); - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); + tx.commit(); + } - startGridsMultiThreaded(SRVS); + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } - client = true; + checkValue(key, null, cache.getName()); - startGridsMultiThreaded(SRVS, CLIENTS); + cache.put(key, 1); - client = false; - } + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + boolean replace = cache.replace(key, 2); - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); + assertTrue(replace); - stopAllGrids(); - } + tx.commit(); + } - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 5 * 60_000; + checkValue(key, 2, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } } /** * @throws Exception If failed. */ - public void testTxCommitReadOnly() throws Exception { + public void testTxConflictGetAndReplace() throws Exception { Ignite ignite0 = ignite(0); final IgniteTransactions txs = ignite0.transactions(); @@ -125,99 +892,86 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { try { IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); - List<Integer> keys = new ArrayList<>(); + List<Integer> keys = testKeys(cache); - keys.add(nearKey(cache)); - keys.add(primaryKey(cache)); + for (final Integer key : keys) { + log.info("Test key: " + key); - if (ccfg.getBackups() != 0) - keys.add(backupKey(cache)); + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); - for (Integer key : keys) { - log.info("Test key: " + key); + assertNull(old); - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + updateKey(cache, key, 1); - assertNull(val); + tx.commit(); + } - tx.commit(); + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); } - checkValue(key, null, ccfg.getName()); + checkValue(key, 1, cache.getName()); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + Object old = cache.getAndReplace(key, 2); - assertNull(val); + assertEquals(1, old); - tx.rollback(); + tx.commit(); } - checkValue(key, null, ccfg.getName()); - } - } - finally { - ignite0.destroyCache(ccfg.getName()); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackRead1() throws Exception { - txRollbackRead(true); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackRead2() throws Exception { - txRollbackRead(false); - } - - /** - * @param noVal If {@code true} there is no cache value when read in tx. - * @throws Exception If failed. - */ - private void txRollbackRead(boolean noVal) throws Exception { - Ignite ignite0 = ignite(0); - - final IgniteTransactions txs = ignite0.transactions(); + checkValue(key, 2, cache.getName()); - List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + cache.remove(key); - ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); - for (CacheConfiguration<Integer, Integer> ccfg : ccfgs) { - logCacheInfo(ccfg); + assertNull(old); - try { - IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + tx.commit(); + } - List<Integer> keys = new ArrayList<>(); + checkValue(key, null, cache.getName()); - keys.add(nearKey(cache)); + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object old = cache.getAndReplace(key, 2); - for (Integer key : keys) { - log.info("Test key: " + key); + assertNull(old); - Integer expVal = null; + updateKey(cache, key, 3); - if (!noVal) { - expVal = -1; + tx.commit(); + } - cache.put(key, expVal); + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); } + checkValue(key, 3, cache.getName()); + try { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + Object old = cache.getAndReplace(key, 2); - assertEquals(expVal, val); + assertEquals(3, old); - updateKey(cache, key, 1); + txAsync(cache, OPTIMISTIC, SERIALIZABLE, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { + cache.remove(key); + + return null; + } + } + ); tx.commit(); } @@ -228,21 +982,23 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { log.info("Expected exception: " + e); } - assertEquals(1, (Object)cache.get(key)); + checkValue(key, null, cache.getName()); + + cache.put(key, 1); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Object val = cache.get(key); + Object old = cache.getAndReplace(key, 2); - assertEquals(1, val); + assertEquals(1, old); tx.commit(); } - assertEquals(1, (Object)cache.get(key)); + checkValue(key, 2, cache.getName()); } } finally { - ignite0.destroyCache(ccfg.getName()); + destroyCache(ignite0, ccfg.getName()); } } } @@ -250,46 +1006,113 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testTxRollbackReadWrite() throws Exception { + public void testTxNoConflictPut1() throws Exception { + txNoConflictUpdate(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictPut2() throws Exception { + txNoConflictUpdate(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove1() throws Exception { + txNoConflictUpdate(true, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoConflictRemove2() throws Exception { + txNoConflictUpdate(false, true); + } + + /** + * @throws Exception If failed. + * @param noVal If {@code true} there is no cache value when do update in tx. + * @param rmv If {@code true} tests remove, otherwise put. + */ + private void txNoConflictUpdate(boolean noVal, boolean rmv) throws Exception { Ignite ignite0 = ignite(0); final IgniteTransactions txs = ignite0.transactions(); - final IgniteCache<Integer, Integer> cache = - ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); - final Integer key = nearKey(cache); + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Integer val = cache.get(key); + List<Integer> keys = testKeys(cache); - assertNull(val); + for (Integer key : keys) { + log.info("Test key: " + key); - updateKey(cache, key, 1); + if (!noVal) + cache.put(key, -1); - cache.put(key, 2); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (rmv) + cache.remove(key); + else + cache.put(key, 2); - log.info("Commit"); + updateKey(cache, key, 1); - tx.commit(); - } + tx.commit(); + } - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } + checkValue(key, rmv ? null : 2, cache.getName()); - assertEquals(1, (Object)cache.get(key)); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 3); - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key, 2); + tx.commit(); + } - tx.commit(); - } + checkValue(key, 3, cache.getName()); + } + + Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (rmv) + cache.removeAll(map.keySet()); + else + cache.putAll(map); + + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, + new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { + Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, -1); + + cache.putAll(map); - assertEquals(2, (Object) cache.get(key)); + return null; + } + } + ); + + tx.commit(); + } + + for (int i = 0; i < 100; i++) + checkValue(i, rmv ? null : i, cache.getName()); + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } } /** @@ -306,43 +1129,47 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { try { IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); - final Integer key = nearKey(cache); + List<Integer> keys = testKeys(cache); - CountDownLatch latch = new CountDownLatch(1); + for (Integer key : keys) { + log.info("Test key: " + key); - IgniteInternalFuture<?> fut = lockKey(latch, cache, key); + CountDownLatch latch = new CountDownLatch(1); - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key, 2); + IgniteInternalFuture<?> fut = lockKey(latch, cache, key); - log.info("Commit"); + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); - tx.commit(); + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); } - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } + latch.countDown(); - latch.countDown(); + fut.get(); - fut.get(); + checkValue(key, 1, cache.getName()); - assertEquals(1, (Object)cache.get(key)); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.put(key, 2); + tx.commit(); + } - tx.commit(); + checkValue(key, 2, cache.getName()); } - - assertEquals(2, (Object)cache.get(key)); } finally { - ignite0.destroyCache(ccfg.getName()); + destroyCache(ignite0, ccfg.getName()); } } } @@ -365,7 +1192,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @param locKey If {@code true} gets lock for local key. * @throws Exception If failed. */ - public void rollbackIfLockedPartialLock(boolean locKey) throws Exception { + private void rollbackIfLockedPartialLock(boolean locKey) throws Exception { Ignite ignite0 = ignite(0); final IgniteTransactions txs = ignite0.transactions(); @@ -388,8 +1215,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { cache.put(key1, 2); cache.put(key2, 2); - log.info("Commit2"); - tx.commit(); } @@ -403,23 +1228,21 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { fut.get(); - assertEquals(1, (Object) cache.get(key1)); - assertNull(cache.get(key2)); + checkValue(key1, 1, cache.getName()); + checkValue(key2, null, cache.getName()); try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { cache.put(key1, 2); cache.put(key2, 2); - log.info("Commit3"); - tx.commit(); } - assertEquals(2, (Object) cache.get(key2)); - assertEquals(2, (Object) cache.get(key2)); + checkValue(key1, 2, cache.getName()); + checkValue(key2, 2, cache.getName()); } finally { - ignite0.destroyCache(ccfg.getName()); + destroyCache(ignite0, ccfg.getName()); } } } @@ -511,7 +1334,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { }); } - for (int i = 0; i < 10; i++) { + int ITERS = FAST ? 1 : 10; + + for (int i = 0; i < ITERS; i++) { log.info("Iteration: " + i); final long stopTime = U.currentTimeMillis() + 10_000; @@ -594,7 +1419,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - ignite(1).destroyCache(cacheName); + destroyCache(ignite(1), cacheName); } } @@ -608,11 +1433,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false)); + ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, false, false)); // Store, no near. ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false)); ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false)); ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false)); + ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, true, false)); // No store, near. ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true)); @@ -640,32 +1467,74 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @param cache Cache. - * @param key Key. - * @param val Value. + * @return Test keys. * @throws Exception If failed. */ - private void updateKey( - final IgniteCache<Integer, Integer> cache, - final Integer key, - final Integer val) throws Exception { + private List<Integer> testKeys(IgniteCache<Integer, Integer> cache) throws Exception { + CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class); + + List<Integer> keys = new ArrayList<>(); + + if (ccfg.getCacheMode() == PARTITIONED) + keys.add(nearKey(cache)); + + keys.add(primaryKey(cache)); + + if (ccfg.getBackups() != 0) + keys.add(backupKey(cache)); + + return keys; + } + + /** + * @param cache Cache. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolcation. + * @param c Closure to run in transaction. + * @throws Exception If failed. + */ + private void txAsync(final IgniteCache<Integer, Integer> cache, + final TransactionConcurrency concurrency, + final TransactionIsolation isolation, + final IgniteClosure<IgniteCache<Integer, Integer>, Void> c) throws Exception { IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { + @Override + public Void call() throws Exception { IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(key, val); + try (Transaction tx = txs.txStart(concurrency, isolation)) { + c.apply(cache); tx.commit(); } return null; } - }, "update-thread"); + }, "async-thread"); fut.get(); } /** + * @param cache Cache. + * @param key Key. + * @param val Value. + * @throws Exception If failed. + */ + private void updateKey( + final IgniteCache<Integer, Integer> cache, + final Integer key, + final Integer val) throws Exception { + txAsync(cache, PESSIMISTIC, REPEATABLE_READ, new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { + cache.put(key, val); + + return null; + } + }); + } + + /** * @param key Key. * @param expVal Expected value. * @param cacheName Cache name. @@ -719,6 +1588,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** + * @param ignite Node. + * @param cacheName Cache name. + */ + private void destroyCache(Ignite ignite, String cacheName) { + storeMap.clear(); + + ignite.destroyCache(cacheName); + } + + /** * @param cacheMode Cache mode. * @param syncMode Write synchronization mode. * @param backups Number of backups. @@ -736,9 +1615,11 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { ccfg.setCacheMode(cacheMode); ccfg.setAtomicityMode(TRANSACTIONAL); - ccfg.setBackups(backups); ccfg.setWriteSynchronizationMode(syncMode); + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + if (storeEnabled) { ccfg.setCacheStoreFactory(new TestStoreFactory()); ccfg.setWriteThrough(true); @@ -759,17 +1640,44 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { @Override public CacheStore<Integer, Integer> create() { return new CacheStoreAdapter<Integer, Integer>() { @Override public Integer load(Integer key) throws CacheLoaderException { - return null; + return storeMap.get(key); } @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { - // No-op. + storeMap.put(entry.getKey(), entry.getValue()); } @Override public void delete(Object key) { - // No-op. + storeMap.remove(key); } }; } } + + /** + * Sets given value, returns old value. + */ + public static final class SetValueProcessor implements EntryProcessor<Integer, Integer, Integer> { + /** */ + private Integer newVal; + + /** + * @param newVal New value to set. + */ + SetValueProcessor(Integer newVal) { + this.newVal = newVal; + } + + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) { + Integer val = entry.getValue(); + + if (newVal == null) + entry.remove(); + else + entry.setValue(newVal); + + return val; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index e46c139..6b2a6c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -442,7 +442,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ - @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap, + @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned( + IgniteInternalTx tx, + boolean readSwap, boolean unmarshal, boolean updateMetrics, boolean evt,
