Repository: ignite Updated Branches: refs/heads/master 6a09c4e20 -> b7a0adc71
IGNITE-8509 Fixed and reworkd tx rollback tests - Fixes #4150. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7a0adc7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7a0adc7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7a0adc7 Branch: refs/heads/master Commit: b7a0adc711bb28e7d23f5392bbd588c666cedc22 Parents: 6a09c4e Author: Aleksei Scherbakov <[email protected]> Authored: Mon Sep 10 14:33:40 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Sep 10 14:33:40 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 1 - .../cache/transactions/TxRollbackAsyncTest.java | 255 ++++++++++--------- 2 files changed, 139 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b7a0adc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 1e25c93..2e66e5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -611,7 +611,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } } - /** * @param cacheMsg Cache message. * @param nodeId Node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/b7a0adc7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java index 7968be3..4ca8ba3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.transactions; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -25,7 +24,11 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,6 +49,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -55,6 +59,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -81,6 +87,7 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionRollbackException; +import static java.lang.Thread.interrupted; import static java.lang.Thread.yield; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -344,13 +351,12 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { CountDownLatch waitCommit = new CountDownLatch(1); + // Used for passing tx instance to rollback thread. IgniteInternalFuture<?> lockFut = lockInTx(holdLockNode, keyLocked, waitCommit, 0); U.awaitQuiet(keyLocked); - final CountDownLatch rollbackLatch = new CountDownLatch(1); - - final int txCnt = 10000; + final int txCnt = 1000; final IgniteKernal k = (IgniteKernal)tryLockNode; @@ -358,7 +364,16 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { final GridCacheContext<Object, Object> cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME)); - final AtomicBoolean stop = new AtomicBoolean(); + GridFutureAdapter<Transaction> txReadyFut = new GridFutureAdapter<>(); + + long seed = System.currentTimeMillis(); + + Random r = new Random(seed); + + log.info("Running: node0=" + holdLockNode.cluster().localNode().consistentId() + + ", node1=" + tryLockNode.cluster().localNode().consistentId() + + ", useTimeout=" + useTimeout + + ", seed=" + seed); IgniteInternalFuture<?> txFut = multithreadedAsync(new Runnable() { @Override public void run() { @@ -369,10 +384,10 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { assertTrue(tx0 == null || tx0.state() == ROLLED_BACK); - rollbackLatch.countDown(); - try (Transaction tx = tryLockNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, - useTimeout ? 500 : 0, 1)) { + useTimeout ? 50 : 0, 1)) { + + txReadyFut.onDone(tx); // Will block on lock request until rolled back asynchronously. Object o = tryLockNode.cache(CACHE_NAME).get(0); @@ -384,29 +399,30 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { } } - stop.set(true); + txReadyFut.onDone((Transaction)null); } }, 1, "tx-get-thread"); IgniteInternalFuture<?> rollbackFut = multithreadedAsync(new Runnable() { @Override public void run() { - U.awaitQuiet(rollbackLatch); - - doSleep(50); - Set<IgniteUuid> rolledBackVers = new HashSet<>(); int proc = 1; - while(!stop.get()) { - for (Transaction tx : tryLockNode.transactions().localActiveTransactions()) { + while(true) { + try { + Transaction tx = txReadyFut.get(); + + txReadyFut.reset(); + + if (tx == null) + break; + + doSleep(r.nextInt(15)); // Wait a bit to reduce chance of rolling back empty transactions. + if (rolledBackVers.contains(tx.xid())) fail("Rollback version is expected"); - // Skip write transaction. - if (LABEL.equals(tx.label())) - continue; - try { if (proc % 2 == 0) tx.rollback(); @@ -419,14 +435,15 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { rolledBackVers.add(tx.xid()); - if (proc % 1000 == 0) + if (proc % 100 == 0) log.info("Rolled back: " + proc); proc++; } + catch (IgniteCheckedException e) { + fail(e.getMessage()); + } } - - assertEquals("Unexpected size", txCnt, rolledBackVers.size()); } }, 1, "tx-rollback-thread"); @@ -613,8 +630,6 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { * */ public void testMixedAsyncRollbackTypes() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-8509"); - final Ignite client = startClient(); final AtomicBoolean stop = new AtomicBoolean(); @@ -640,134 +655,124 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { final LongAdder failed = new LongAdder(); final LongAdder rolledBack = new LongAdder(); - IgniteInternalFuture<?> txFut = multithreadedAsync(new Runnable() { - @Override public void run() { - while (!stop.get()) { - int nodeId = r.nextInt(GRID_CNT + 1); + ConcurrentMap<Ignite, BlockingQueue<Transaction>> perNodeTxs = new ConcurrentHashMap<>(); - // Choose random node to start tx on. - Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? client : grid(nodeId); + for (Ignite ignite : G.allGrids()) + perNodeTxs.put(ignite, new ArrayBlockingQueue<>(1000)); - TransactionConcurrency conc = TC_VALS[r.nextInt(TC_VALS.length)]; - TransactionIsolation isolation = TI_VALS[r.nextInt(TI_VALS.length)]; + IgniteInternalFuture<?> txFut = multithreadedAsync(() -> { + while (!stop.get()) { + int nodeId = r.nextInt(GRID_CNT + 1); - long timeout = r.nextInt(50) + 50; // Timeout is necessary to prevent deadlocks. + // Choose random node to start tx on. + Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? client : grid(nodeId); - try (Transaction tx = node.transactions().txStart(conc, isolation, timeout, txSize)) { - int setSize = r.nextInt(txSize / 2) + 1; + TransactionConcurrency conc = TC_VALS[r.nextInt(TC_VALS.length)]; + TransactionIsolation isolation = TI_VALS[r.nextInt(TI_VALS.length)]; - for (int i = 0; i < setSize; i++) { - switch (r.nextInt(4)) { - case 0: - node.cache(CACHE_NAME).remove(r.nextInt(txSize)); + // Timeout is necessary otherwise deadlock is possible due to randomness of lock acquisition. + long timeout = r.nextInt(50) + 50; - break; + try (Transaction tx = node.transactions().txStart(conc, isolation, timeout, txSize)) { + BlockingQueue<Transaction> nodeQ = perNodeTxs.get(node); - case 1: - node.cache(CACHE_NAME).get(r.nextInt(txSize)); + nodeQ.put(tx); - break; + int setSize = r.nextInt(txSize / 2) + 1; - case 2: - final Integer v = (Integer)node.cache(CACHE_NAME).get(r.nextInt(txSize)); + for (int i = 0; i < setSize; i++) { + switch (r.nextInt(4)) { + case 0: + node.cache(CACHE_NAME).remove(r.nextInt(txSize)); - node.cache(CACHE_NAME).put(r.nextInt(txSize), (v == null ? 0 : v) + 1); + break; - break; + case 1: + node.cache(CACHE_NAME).get(r.nextInt(txSize)); - case 3: - node.cache(CACHE_NAME).put(r.nextInt(txSize), 0); + break; - break; + case 2: + final Integer v = (Integer)node.cache(CACHE_NAME).get(r.nextInt(txSize)); - default: - fail("Unexpected opcode"); - } - } + node.cache(CACHE_NAME).put(r.nextInt(txSize), (v == null ? 0 : v) + 1); - tx.commit(); + break; - completed.add(1); - } - catch (Throwable e) { - failed.add(1); + case 3: + node.cache(CACHE_NAME).put(r.nextInt(txSize), 0); + + break; + + default: + fail("Unexpected opcode"); + } } - total.add(1); + tx.commit(); + + completed.add(1); } + catch (Throwable e) { + failed.add(1); + } + + total.add(1); } }, threadCnt, "tx-thread"); final AtomicIntegerArray idx = new AtomicIntegerArray(GRID_CNT + 1); - IgniteInternalFuture<?> rollbackFut = multithreadedAsync(new Runnable() { - @Override public void run() { - int concurrentRollbackCnt = 5; - - List<IgniteFuture<?>> futs = new ArrayList<>(concurrentRollbackCnt); - - while (!stop.get()) { - // Choose node randomly. - final int nodeId = r.nextInt(GRID_CNT + 1); - - // Reserve node. - if (!idx.compareAndSet(nodeId, 0, 1)) { - yield(); + CIX1<Transaction> rollbackClo = new CIX1<Transaction>() { + @Override public void applyx(Transaction tx) throws IgniteCheckedException { + try { + IgniteFuture<Void> rollbackFut = tx.rollbackAsync(); - continue; - } + rollbackFut.listen(new IgniteInClosure<IgniteFuture<Void>>() { + @Override public void apply(IgniteFuture<Void> fut) { + tx.close(); + } + }); + } + catch (Throwable t) { + log.error("Exception on async rollback", t); - Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? client : grid(nodeId); + throw new IgniteCheckedException("Rollback failed", t); + } + } + }; - Collection<Transaction> transactions = node.transactions().localActiveTransactions(); + IgniteInternalFuture<?> rollbackFut = multithreadedAsync(() -> { + while (!interrupted()) { + // Choose node randomly. + final int nodeId = r.nextInt(GRID_CNT + 1); - for (Transaction tx : transactions) { - rolledBack.add(1); + // Reserve node for rollback. + if (!idx.compareAndSet(nodeId, 0, 1)) { + yield(); - if (rolledBack.sum() % 1000 == 0) - info("Processed: " + rolledBack.sum()); + continue; + } - try { - IgniteFuture<Void> rollbackFut = tx.rollbackAsync(); + Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? client : grid(nodeId); - rollbackFut.listen(new IgniteInClosure<IgniteFuture<Void>>() { - @Override public void apply(IgniteFuture<Void> fut) { - tx.close(); - } - }); + BlockingQueue<Transaction> nodeQ = perNodeTxs.get(node); - futs.add(rollbackFut); - } - catch (Throwable t) { - log.error("Exception on async rollback", t); + Transaction tx; - fail("Exception is not expected"); - } + // Rollback all transaction + while((tx = nodeQ.poll()) != null) { + rolledBack.add(1); - if (futs.size() == concurrentRollbackCnt) { - for (IgniteFuture<?> fut : futs) - try { - fut.get(); - } - catch (IgniteException e) { - log.warning("Future was rolled back with error", e); - } + doSleep(r.nextInt(50)); // Add random sleep to increase completed txs count. - futs.clear(); - } - } + if (rolledBack.sum() % 1000 == 0) + info("Rolled back so far: " + rolledBack.sum()); - idx.set(nodeId, 0); + rollbackClo.apply(tx); } - for (IgniteFuture<?> fut : futs) - try { - fut.get(); - } - catch (Throwable t) { - // No-op. - } - + idx.set(nodeId, 0); } }, 3, "rollback-thread"); // Rollback by multiple threads. @@ -775,9 +780,27 @@ public class TxRollbackAsyncTest extends GridCommonAbstractTest { stop.set(true); - txFut.get(); + txFut.get(); // Stop tx generation. - rollbackFut.get(); + rollbackFut.cancel(); + + try { + rollbackFut.get(); + } + catch (IgniteFutureCancelledCheckedException e) { + // Expected. + } + + // Rollback remaining transactions. + for (BlockingQueue<Transaction> queue : perNodeTxs.values()) { + Transaction tx; + + while((tx = queue.poll()) != null) { + rolledBack.add(1); + + rollbackClo.apply(tx); + } + } log.info("total=" + total.sum() + ", completed=" + completed.sum() + ", failed=" + failed.sum() + ", rolledBack=" + rolledBack.sum());
