Repository: ignite Updated Branches: refs/heads/master a029997b3 -> d77fe8153
ignite-4484 Call commit for read-only transactions (reverted changes introduced in ignite-3601). Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d77fe815 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d77fe815 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d77fe815 Branch: refs/heads/master Commit: d77fe815395dcd2871d7d1cd4cb6d978918e616e Parents: a029997 Author: sboikov <[email protected]> Authored: Wed Feb 22 11:47:38 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Feb 22 11:47:38 2017 +0300 ---------------------------------------------------------------------- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../cache/CacheGetEntryAbstractTest.java | 16 +- .../CacheSerializableTransactionsTest.java | 173 +++++++++++++++++-- .../processors/cache/CacheTxFastFinishTest.java | 2 +- 4 files changed, 170 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d77fe815/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 197792b..1c80f5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -959,7 +959,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @return {@code True} if 'fast finish' path can be used for transaction completion. */ private boolean fastFinish() { - return writeMap().isEmpty() && (optimistic() || readMap().isEmpty()); + return writeMap().isEmpty() && ((optimistic() && !serializable()) || readMap().isEmpty()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d77fe815/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java index 86bf70e..3487845 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java @@ -267,25 +267,27 @@ public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTes */ private void testConcurrentOptimisticTxGet(final IgniteCache<Integer, TestValue> cache, final TransactionIsolation txIsolation) throws Exception { + final int key1 = 42; + final int key2 = 43; + + cache.put(key1, new TestValue(key1)); + GridTestUtils.runMultiThreaded(new Runnable() { @Override public void run() { - final int key = 42; - IgniteTransactions txs = grid(0).transactions(); - cache.put(key, new TestValue(key)); + cache.put(key2, new TestValue(key2)); long stopTime = System.currentTimeMillis() + 3000; while (System.currentTimeMillis() < stopTime) { try (Transaction tx = txs.txStart(OPTIMISTIC, txIsolation)) { - cache.get(key); + cache.get(key1); tx.commit(); } - catch (TransactionOptimisticException ignored) { - assertTrue("Should not throw optimistic exception in only read TX. Tx isolation: " - + txIsolation, false); + catch (Exception ignored) { + fail("Unexpected exception: " + ignored); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d77fe815/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index ac61091..0e6d264 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; @@ -751,6 +752,11 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { updateKey(cache, key, 1); tx.commit(); + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); } checkValue(key, 1, cache.getName()); @@ -2917,7 +2923,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { sum += account.value(); } - cache.put(putKey, new Account(sum)); + if (ThreadLocalRandom.current().nextBoolean()) + cache.put(putKey, new Account(sum)); tx.commit(); } @@ -3207,14 +3214,21 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { cache0.put(key2, -1); cache0.put(key3, -1); - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - cache.get(key1); - cache.get(key2); - cache.get(key3); + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.get(key1); + cache.get(key2); + cache.get(key3); - updateKey(near ? cache : cache0, key2, -2); + updateKey(near ? cache : cache0, key2, -2); - tx.commit(); + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); } checkValue(key1, -1, cacheName); @@ -3465,16 +3479,23 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { checkValue(key1, newVal, CACHE1); checkValue(key2, newVal, CACHE2); - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - Object val1 = cache1.get(key1); - Object val2 = cache2.get(key2); + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Object val1 = cache1.get(key1); + Object val2 = cache2.get(key2); - assertEquals(newVal, val1); - assertEquals(newVal, val2); + assertEquals(newVal, val1); + assertEquals(newVal, val2); - updateKey(cache2, key2, newVal); + updateKey(cache2, key2, newVal); - tx.commit(); + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); } try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { @@ -4483,6 +4504,130 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** + * Multithreaded transactional reads. + * + * @throws Exception If failed. + */ + public void testMultipleOptimisticRead() throws Exception { + final Ignite ignite = ignite(0); + final Integer key = 1; + final Integer val = 1; + final int THREADS_CNT = 50; + + final String cacheName = + ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); + + try { + final IgniteCache<Integer, Integer> cache = ignite.cache(cacheName); + + try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, val); + + tx.commit(); + } + + assertTrue(cache.get(key).equals(val)); + + for (int i = 0; i < 10; i++) { + GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + assertTrue(cache.get(key).equals(val)); + + tx.commit(); + + } + return null; + } + }, THREADS_CNT, "multiple-reads-thread").get(); + } + } + finally { + destroyCache(cacheName); + } + } + + /** + * Transactional read in parallel with changing the same data. + * + * @throws Exception If failed. + */ + public void testTxReadInParallerTxWrite() throws Exception { + final Ignite ignite = ignite(0); + final Integer key = 1; + final Integer val = 1; + + final CountDownLatch readLatch = new CountDownLatch(1); + final CountDownLatch writeLatch = new CountDownLatch(1); + + final Exception[] err = {null}; + + final String cacheName = + ignite.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); + + final IgniteCache<Integer, Integer> cache = ignite.cache(cacheName); + + try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, val); + + tx.commit(); + } + + try { + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + assertTrue(cache.get(key).equals(val)); + + readLatch.countDown(); + + writeLatch.await(10, TimeUnit.SECONDS); + + try { + tx.commit(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + + err[0] = e; + } + } + return null; + } + }, "read-thread"); + + GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + readLatch.await(10, TimeUnit.SECONDS); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, val); + + tx.commit(); + } + + writeLatch.countDown(); + + return null; + } + }, "write-thread").get(); + + fut.get(); + + assertNotNull("Expected exception was not thrown", err[0]); + } + finally { + destroyCache(cacheName); + } + } + + /** * @throws Exception If failed. */ public void testConcurrentUpdateNoDeadlock() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/d77fe815/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java index f9c6683..35b1405 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java @@ -173,7 +173,7 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest { try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { cache.get(i); - checkFastTxFinish(tx, commit); + checkNormalTxFinish(tx, commit); } try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
