Improved and enabled GridCacheStopSelfTest.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/722aacfd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/722aacfd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/722aacfd Branch: refs/heads/ignite-1282 Commit: 722aacfdfb9a89ad8696c73f896c2e0643eaa86d Parents: c584cca Author: sboikov <[email protected]> Authored: Thu Nov 26 16:41:59 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Nov 26 16:41:59 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLockFuture.java | 1 + .../near/GridNearTxFinishFuture.java | 14 +- .../processors/cache/GridCacheStopSelfTest.java | 143 +++++++++++++++++-- 3 files changed, 140 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/722aacfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 491ccd2..2b5d5a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -728,6 +728,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * Completeness callback. * * @param success {@code True} if lock was acquired. + * @param stopping {@code True} if node is stopping. * @return {@code True} if complete by this operation. */ private boolean onComplete(boolean success, boolean stopping) { http://git-wip-us.apache.org/repos/asf/ignite/blob/722aacfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index f76fc96..291c88a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -25,6 +25,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -203,6 +204,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu boolean marked = tx.setRollbackOnly(); + if (err instanceof NodeStoppingException) + return super.onDone(null, err); + if (err instanceof IgniteTxRollbackCheckedException) { if (marked) { try { @@ -241,13 +245,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } } - if (tx.onePhaseCommit()) { - boolean commit = this.commit && err == null; + if (tx.onePhaseCommit()) { + boolean commit = this.commit && err == null; - finishOnePhase(commit); + finishOnePhase(commit); - tx.tmFinish(commit); - } + tx.tmFinish(commit); + } if (super.onDone(tx0, err)) { if (error() instanceof IgniteTxHeuristicCheckedException) { http://git-wip-us.apache.org/repos/asf/ignite/blob/722aacfd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java index 59c899d..a34857f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java @@ -21,36 +21,43 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import javax.cache.CacheException; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Tests correct cache stopping. */ public class GridCacheStopSelfTest extends GridCommonAbstractTest { - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1393"); - } - /** */ private static final String EXPECTED_MSG = "Cache has been closed or destroyed"; /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ private boolean atomic; /** */ @@ -62,7 +69,7 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest { TcpDiscoverySpi disc = new TcpDiscoverySpi(); - disc.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + disc.setIpFinder(ipFinder); cfg.setDiscoverySpi(disc); @@ -70,6 +77,9 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest { ccfg.setCacheMode(replicated ? REPLICATED : PARTITIONED); + if (!replicated) + ccfg.setBackups(1); + ccfg.setAtomicityMode(atomic ? ATOMIC : TRANSACTIONAL); ccfg.setSwapEnabled(true); @@ -126,6 +136,112 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testStopMultithreaded() throws Exception { + try { + startGrid(0); + + for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + + startGridsMultiThreaded(1, 3); + + final AtomicInteger threadIdx = new AtomicInteger(0); + + final IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = threadIdx.getAndIncrement(); + + IgniteKernal node = (IgniteKernal)ignite(idx % 3 + 1); + + IgniteCache<Integer, Integer> cache = node.cache(null); + + while (true) { + try { + cacheOperations(node, cache); + } + catch (Exception e) { + if (node.isStopping()) + break; + } + } + + return null; + } + }, 20, "tx-node-stop-thread"); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteKernal node = (IgniteKernal)ignite(0); + + IgniteCache<Integer, Integer> cache = node.cache(null); + + while (!fut1.isDone()) { + try { + cacheOperations(node, cache); + } + catch (Exception ignore) { + // No-op. + } + } + + return null; + } + }, 2, "tx-thread"); + + Thread.sleep(3000); + + final AtomicInteger nodeIdx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = nodeIdx.getAndIncrement(); + + log.info("Stop node: " + idx); + + ignite(idx).close(); + + return null; + } + }, 3, "stop-node"); + + fut1.get(); + fut2.get(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param node Node. + * @param cache Cache. + */ + private void cacheOperations(Ignite node, IgniteCache<Integer, Integer> cache) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + Integer key = rnd.nextInt(1000); + + cache.put(key, key); + + cache.get(key); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + cache.put(key, key); + + tx.commit(); + } + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, key); + + tx.commit(); + } + } + + /** * @param startTx If {@code true} starts transactions. * @throws Exception If failed. */ @@ -143,8 +259,10 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest { assertNotNull(cache); - assertEquals(atomic ? ATOMIC : TRANSACTIONAL, cache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); - assertEquals(replicated ? REPLICATED : PARTITIONED, cache.getConfiguration(CacheConfiguration.class).getCacheMode()); + CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class); + + assertEquals(atomic ? ATOMIC : TRANSACTIONAL, ccfg.getAtomicityMode()); + assertEquals(replicated ? REPLICATED : PARTITIONED, ccfg.getCacheMode()); Collection<IgniteInternalFuture<?>> putFuts = new ArrayList<>(); @@ -155,7 +273,9 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest { @Override public Void call() throws Exception { try { if (startTx) { - try (Transaction tx = grid(0).transactions().txStart()) { + TransactionConcurrency concurrency = key % 2 == 0 ? OPTIMISTIC : PESSIMISTIC; + + try (Transaction tx = grid(0).transactions().txStart(concurrency, REPEATABLE_READ)) { cache.put(key, key); readyLatch.countDown(); @@ -173,12 +293,9 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest { cache.put(key, key); } } - catch (CacheException | IgniteException e) { + catch (CacheException | IgniteException | IllegalStateException e) { log.info("Ignore error: " + e); } - catch (IllegalStateException e) { - assertTrue(e.getMessage().startsWith(EXPECTED_MSG)); - } return null; }
