Repository: ignite Updated Branches: refs/heads/ignite-1607 [created] 15432ae94
ignite-1607 restart test Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15432ae9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15432ae9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15432ae9 Branch: refs/heads/ignite-1607 Commit: 15432ae9473779b5b054fe41e94f919698c836fd Parents: 303def3 Author: sboikov <[email protected]> Authored: Thu Oct 29 12:54:07 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 30 12:18:57 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 44 +++--- ...arOptimisticSerializableTxPrepareFuture.java | 12 +- .../cache/transactions/IgniteTxManager.java | 4 +- .../CacheSerializableTransactionsTest.java | 151 +++++++++++++++++++ 4 files changed, 187 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index d806801..61975d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -573,27 +573,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (tx.onePhaseCommit() && tx.commitOnPrepare()) { assert last; + Throwable prepErr = this.err.get(); + // Must create prepare response before transaction is committed to grab correct return value. - final GridNearTxPrepareResponse res = createPrepareResponse(); + final GridNearTxPrepareResponse res = createPrepareResponse(prepErr); onComplete(res); if (tx.commitOnPrepare()) { if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { - IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ? - tx.commitAsync() : tx.rollbackAsync(); + IgniteInternalFuture<IgniteInternalTx> fut = null; - fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { - try { - if (replied.compareAndSet(false, true)) - sendPrepareResponse(res); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + if (prepErr == null) + fut = tx.commitAsync(); + else if (!cctx.kernalContext().isStopping()) + fut = tx.rollbackAsync(); + + if (fut != null) { + fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } } - } - }); + }); + } } } else { @@ -610,7 +618,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } else { if (replied.compareAndSet(false, true)) { - GridNearTxPrepareResponse res = createPrepareResponse(); + GridNearTxPrepareResponse res = createPrepareResponse(this.err.get()); try { sendPrepareResponse(res); @@ -659,12 +667,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } /** + * @param prepErr Error. * @return Prepare response. */ - private GridNearTxPrepareResponse createPrepareResponse() { - // Send reply back to originating near node. - Throwable prepErr = err.get(); - + private GridNearTxPrepareResponse createPrepareResponse(@Nullable Throwable prepErr) { assert F.isEmpty(tx.invalidPartitions()); GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( @@ -981,7 +987,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (err0 != null) { err.compareAndSet(null, err0); - final GridNearTxPrepareResponse res = createPrepareResponse(); + final GridNearTxPrepareResponse res = createPrepareResponse(err.get()); tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 47c1d21..5488bb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -809,18 +809,22 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim remap(res); } catch (IgniteCheckedException e) { + err.compareAndSet(null, e); + onDone(e); } } }); } else { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + ClusterTopologyCheckedException err0 = new ClusterTopologyCheckedException( "Cluster topology changed while client transaction is preparing."); - err.retryReadyFuture(affFut); + err0.retryReadyFuture(affFut); + + err.compareAndSet(null, err0); - onDone(err); + onDone(err0); } } catch (IgniteCheckedException e) { @@ -829,6 +833,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim GridNearOptimisticSerializableTxPrepareFuture.this); } + err.compareAndSet(null, e); + onDone(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c1e9202..1f51b8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1617,8 +1617,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { { for (final IgniteInternalTx tx : txs()) { if (nearVer.equals(tx.nearXidVersion())) { - TransactionState state = tx.state(); - IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture(); if (prepFut != null && !prepFut.isDone()) { @@ -1648,6 +1646,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return fut0; } + TransactionState state = tx.state(); + if (state == PREPARED || state == COMMITTING || state == COMMITTED) { if (--txNum == 0) { if (fut != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/15432ae9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 8c135ad..7d37b24 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; @@ -107,6 +108,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** */ private boolean client; + /** */ + private static int cacheId; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -126,6 +130,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); + cacheId = 0; + startGridsMultiThreaded(SRVS); client = true; @@ -3067,6 +3073,142 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testIncrementTxMultipleNodeRestart() throws Exception { + incrementTxMultiple(false, false, true); + } + + /** + * @param nearCache If {@code true} near cache is enabled. + * @param store If {@code true} cache store is enabled. + * @param restart If {@code true} restarts one node. + * @throws Exception If failed. + */ + private void incrementTxMultiple(boolean nearCache, boolean store, final boolean restart) throws Exception { + final Ignite srv = ignite(1); + + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, store, false); + + final List<Ignite> clients = clients(); + + final String cacheName = srv.createCache(ccfg).getName(); + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + final List<IgniteCache<Integer, Integer>> caches = new ArrayList<>(); + + for (Ignite client : clients) { + if (nearCache) + caches.add(client.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>())); + else + caches.add(client.<Integer, Integer>cache(cacheName)); + } + + IgniteInternalFuture<?> restartFut = null; + + if (restart) { + restartFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!stop.get()) { + stopGrid(0); + + U.sleep(300); + + Ignite ignite = startGrid(0); + + assertFalse(ignite.configuration().isClientMode()); + } + + return null; + } + }); + } + + for (int i = 0; i < 30; i += 2) { + final AtomicInteger cntr = new AtomicInteger(); + + final Integer key1 = i; + final Integer key2 = i + 1; + + final AtomicInteger threadIdx = new AtomicInteger(); + + final int THREADS = 10; + + final CyclicBarrier barrier = new CyclicBarrier(THREADS); + + final ConcurrentSkipListSet<Integer> vals1 = new ConcurrentSkipListSet<>(); + final ConcurrentSkipListSet<Integer> vals2 = new ConcurrentSkipListSet<>(); + + GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = threadIdx.getAndIncrement() % caches.size(); + + IgniteCache<Integer, Integer> cache = caches.get(idx); + + Ignite ignite = cache.unwrap(Ignite.class); + + IgniteTransactions txs = ignite.transactions(); + + log.info("Started update thread: " + ignite.name()); + + barrier.await(); + + for (int i = 0; i < 1000; i++) { + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val1 = cache.get(key1); + Integer val2 = cache.get(key2); + + Integer newVal1 = val1 == null ? 1 : val1 + 1; + Integer newVal2 = val2 == null ? 1 : val2 + 1; + + cache.put(key1, newVal1); + cache.put(key2, newVal2); + + tx.commit(); + + assertTrue(vals1.add(newVal1)); + assertTrue(vals2.add(newVal2)); + } + + cntr.incrementAndGet(); + } + catch (TransactionOptimisticException ignore) { + // Retry. + } + catch (IgniteException | CacheException e) { + assertTrue("Unexpected exception [err=" + e + ", cause=" + e.getCause() + ']', + restart && X.hasCause(e, ClusterTopologyCheckedException.class)); + } + } + + return null; + } + }, THREADS, "update-thread").get(); + + log.info("Iteration [iter=" + i + ", val=" + cntr.get() + ']'); + + assertTrue(cntr.get() > 0); + + checkValue(key1, cntr.get(), cacheName, restart); + checkValue(key2, cntr.get(), cacheName, restart); + } + + stop.set(true); + + if (restartFut != null) + restartFut.get(); + } + finally { + stop.set(true); + + destroyCache(srv, cacheName); + } + } + + /** + * @throws Exception If failed. + */ public void testGetRemoveTx() throws Exception { getRemoveTx(false, false); } @@ -3229,6 +3371,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testAccountTxNodeRestart() throws Exception { + accountTx(false, false, false, true, TestMemoryMode.HEAP); + } + + /** * @param getAll If {@code true} uses getAll/putAll in transaction. * @param nearCache If {@code true} near cache is enabled. * @param nonSer If {@code true} starts threads executing non-serializable transactions. @@ -4183,6 +4332,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { boolean nearCache) { CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + ccfg.setName("testCache-" + cacheId++); + ccfg.setCacheMode(cacheMode); ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setWriteSynchronizationMode(syncMode);
