ignite-7049 Fixed error in tx timeout processing for optimistic/serializable tx
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd0d2ebf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd0d2ebf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd0d2ebf Branch: refs/heads/ignite-zk Commit: cd0d2ebf6a4294be851fb316f9b2f6b4ce6cb321 Parents: 9398813 Author: Aleksei Scherbakov <[email protected]> Authored: Sat Dec 2 20:36:29 2017 +0300 Committer: sboikov <[email protected]> Committed: Sat Dec 2 20:36:29 2017 +0300 ---------------------------------------------------------------------- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../transactions/TxRollbackOnTimeoutTest.java | 186 ++++++++++++++++++- 2 files changed, 186 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cd0d2ebf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 1da0589..beb1e16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -184,7 +184,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } } - if (e instanceof IgniteTxOptimisticCheckedException || e instanceof IgniteTxTimeoutCheckedException) { + if (e instanceof IgniteTxOptimisticCheckedException) { if (m != null) tx.removeMapping(m.primary().id()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cd0d2ebf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index e1c6c10..6aa3bdd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -18,24 +18,31 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; @@ -44,8 +51,11 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionOptimisticException; import org.apache.ignite.transactions.TransactionTimeoutException; +import org.jsr166.LongAdder8; +import static java.lang.Thread.sleep; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -56,6 +66,9 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA */ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { /** */ + private static final long DURATION = 60 * 1000L; + + /** */ private static final long TX_MIN_TIMEOUT = 1; /** */ @@ -73,6 +86,8 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + boolean client = "client".equals(igniteInstanceName); cfg.setClientMode(client); @@ -373,6 +388,175 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { } /** + * Test timeouts with random values and different tx configurations. + */ + public void testRandomMixedTxConfigurations() throws Exception { + final Ignite client = startClient(); + + final AtomicBoolean stop = new AtomicBoolean(); + + final long seed = System.currentTimeMillis(); + + final Random r = new Random(seed); + + log.info("Using seed: " + seed); + + final int threadsCnt = Runtime.getRuntime().availableProcessors() * 2; + + for (int k = 0; k < threadsCnt; k++) + grid(0).cache(CACHE_NAME).put(k, (long)0); + + final TransactionConcurrency[] TC_VALS = TransactionConcurrency.values(); + final TransactionIsolation[] TI_VALS = TransactionIsolation.values(); + + final LongAdder8 cntr0 = new LongAdder8(); + final LongAdder8 cntr1 = new LongAdder8(); + final LongAdder8 cntr2 = new LongAdder8(); + final LongAdder8 cntr3 = new LongAdder8(); + + final IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + while (!stop.get()) { + int nodeId = r.nextInt(GRID_CNT + 1); + + Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? client : grid(nodeId); + + TransactionConcurrency conc = TC_VALS[r.nextInt(TC_VALS.length)]; + TransactionIsolation isolation = TI_VALS[r.nextInt(TI_VALS.length)]; + + int k = r.nextInt(threadsCnt); + + long timeout = r.nextInt(200) + 50; + + // Roughly 50% of transactions should time out. + try (Transaction tx = node.transactions().txStart(conc, isolation, timeout, 1)) { + cntr0.add(1); + + final Long v = (Long)node.cache(CACHE_NAME).get(k); + + final int delay = r.nextInt(400); + + if (delay > 0) + sleep(delay); + + node.cache(CACHE_NAME).put(k, v + 1); + + tx.commit(); + + cntr1.add(1); + } + catch (TransactionOptimisticException | InterruptedException e) { + // Expected. + cntr3.add(1); + } + catch (TransactionTimeoutException e) { + cntr2.add(1); + } + catch (CacheException e) { + assertEquals(TransactionTimeoutException.class, X.getCause(e).getClass()); + + cntr2.add(1); + } + } + } + }, threadsCnt, "tx-async-thread"); + + sleep(DURATION); + + stop.set(true); + + fut.get(10_000); + + log.info("Tx test stats: started=" + cntr0.sum() + + ", completed=" + cntr1.sum() + + ", failed=" + cntr3.sum() + + ", timedOut=" + cntr2.sum()); + + assertEquals("Expected finished count same as started count", cntr0.sum(), cntr1.sum() + cntr2.sum() + cntr3.sum()); + } + + /** + * Tests timeout on DHT primary node for all tx configurations. + * + * @throws Exception If failed. + */ + public void testTimeoutOnPrimaryDHTNode() throws Exception { + final ClusterNode n0 = grid(0).affinity(CACHE_NAME).mapKeyToNode(0); + + final Ignite prim = G.ignite(n0.id()); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) + testTimeoutOnPrimaryDhtNode0(prim, concurrency, isolation); + } + } + + /** + * + * @param prim Primary node. + * @param conc Concurrency. + * @param isolation Isolation. + + * @throws Exception If failed. + */ + private void testTimeoutOnPrimaryDhtNode0(final Ignite prim, final TransactionConcurrency conc, + final TransactionIsolation isolation) + throws Exception { + + log.info("concurrency=" + conc + ", isolation=" + isolation); + + // Force timeout on primary DHT node by blocking DHT prepare response. + toggleBlocking(GridDhtTxPrepareResponse.class, prim, true); + + final int val = 0; + + try { + multithreaded(new Runnable() { + @Override public void run() { + try (Transaction txOpt = prim.transactions().txStart(conc, isolation, 300, 1)) { + + prim.cache(CACHE_NAME).put(val, val); + + txOpt.commit(); + } + } + }, 1, "tx-async-thread"); + + fail(); + } + catch (TransactionTimeoutException e) { + // Expected. + } + + toggleBlocking(GridDhtTxPrepareResponse.class, prim, false); + + AffinityTopologyVersion topVer = new AffinityTopologyVersion(GRID_CNT + 1, 0); + + for (Ignite ignite : G.allGrids()) + ((IgniteEx)ignite).context().cache().context().partitionReleaseFuture(topVer).get(10_000); + } + + /** + * @param cls Message class. + * @param nodeToBlock Node to block. + * @param block Block. + */ + private void toggleBlocking(Class<? extends Message> cls, Ignite nodeToBlock, boolean block) { + for (Ignite ignite : G.allGrids()) { + if (ignite == nodeToBlock) + continue; + + final TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + if (block) + spi.blockMessages(cls, nodeToBlock.name()); + else + spi.stopBlock(true); + } + } + + /** * @param concurrency Concurrency. * @param isolation Isolation. * @param op Operation to test. @@ -652,4 +836,4 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { fut2.get(); } -} \ No newline at end of file +}
