IGNITE-8033 Fixed flaky failure of TxOptimisticDeadlockDetectionCrossCacheTest
Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b39f135 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b39f135 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b39f135 Branch: refs/heads/ignite-7708 Commit: 7b39f1355cf7b0d4169622cec2936184168aba99 Parents: 2edcb22 Author: Aleksey Plekhanov <[email protected]> Authored: Tue Apr 17 18:27:53 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Tue Apr 17 18:50:15 2018 +0300 ---------------------------------------------------------------------- ...timisticDeadlockDetectionCrossCacheTest.java | 147 +++++++------------ 1 file changed, 50 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7b39f135/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java index 5d1374c..056b093 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java @@ -18,30 +18,21 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; -import java.util.Set; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -57,9 +48,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA * */ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest { - /** Nodes count. */ - private static final int NODES_CNT = 2; - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -73,10 +61,6 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr cfg.setDiscoverySpi(discoSpi); } - TcpCommunicationSpi commSpi = new TestCommunicationSpi(); - - cfg.setCommunicationSpi(commSpi); - CacheConfiguration ccfg0 = defaultCacheConfiguration(); ccfg0.setName("cache0"); @@ -96,42 +80,46 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr return cfg; } - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(NODES_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - stopAllGrids(); - } - /** * @throws Exception If failed. */ public void testDeadlock() throws Exception { - // Sometimes boh transactions perform commit, so we repeat attempt. - while (!doTestDeadlock()) {} + startGrids(2); + + try { + doTestDeadlock(); + } + finally { + stopAllGrids(); + } } /** * @throws Exception If failed. */ private boolean doTestDeadlock() throws Exception { - TestCommunicationSpi.init(2); - - final CyclicBarrier barrier = new CyclicBarrier(2); - final AtomicInteger threadCnt = new AtomicInteger(); final AtomicBoolean deadlock = new AtomicBoolean(); final AtomicInteger commitCnt = new AtomicInteger(); + grid(0).events().localListen(new CacheLocksListener(), EventType.EVT_CACHE_OBJECT_LOCKED); + + AffinityTopologyVersion waitTopVer = new AffinityTopologyVersion(2, 1); + + IgniteInternalFuture<?> exchFut = grid(0).context().cache().context().exchange().affinityReadyFuture(waitTopVer); + + if (exchFut != null && !exchFut.isDone()) { + log.info("Waiting for topology exchange future [waitTopVer=" + waitTopVer + ", curTopVer=" + + grid(0).context().cache().context().exchange().readyAffinityVersion() + ']'); + + exchFut.get(); + } + + log.info("Finished topology exchange future [curTopVer=" + + grid(0).context().cache().context().exchange().readyAffinityVersion() + ']'); + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { @Override public void run() { int threadNum = threadCnt.getAndIncrement(); @@ -152,8 +140,6 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr cache1.put(key1, 0); - barrier.await(); - int key2 = primaryKey(cache2); log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + @@ -171,23 +157,23 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr hasCause(e, TransactionDeadlockException.class) ) { if (deadlock.compareAndSet(false, true)) - U.error(log, "At least one stack trace should contain " + - TransactionDeadlockException.class.getSimpleName(), e); + log.info("Successfully set deadlock flag"); + else + log.info("Deadlock flag was already set"); } + else + log.warning("Got not deadlock exception", e); } } }, 2, "tx-thread"); fut.get(); - if (commitCnt.get() == 2) - return false; + assertFalse("Commits must fail", commitCnt.get() == 2); assertTrue(deadlock.get()); - for (int i = 0; i < NODES_CNT ; i++) { - Ignite ignite = ignite(i); - + for (Ignite ignite : G.allGrids()) { IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm(); Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures(); @@ -199,59 +185,26 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr } /** + * Listener for cache lock events. * + * To ensure deadlock this listener blocks transaction thread until both threads acquire first lock. */ - private static class TestCommunicationSpi extends TcpCommunicationSpi { - /** Tx count. */ - private static volatile int TX_CNT; - - /** Tx ids. */ - private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>(); - - /** - * @param txCnt Tx count. - */ - private static void init(int txCnt) { - TX_CNT = txCnt; - TX_IDS.clear(); - } + private static class CacheLocksListener implements IgnitePredicate<Event> { + /** Latch. */ + private final CountDownLatch latch = new CountDownLatch(2); /** {@inheritDoc} */ - @Override public void sendMessage( - final ClusterNode node, - final Message msg, - final IgniteInClosure<IgniteException> ackC - ) throws IgniteSpiException { - if (msg instanceof GridIoMessage) { - Message msg0 = ((GridIoMessage)msg).message(); - - if (msg0 instanceof GridNearTxPrepareRequest) { - final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0; - - GridCacheVersion txId = req.version(); - - if (TX_IDS.contains(txId)) { - while (TX_IDS.size() < TX_CNT) { - try { - U.sleep(50); - } - catch (IgniteInterruptedCheckedException e) { - e.printStackTrace(); - } - } - } - } - else if (msg0 instanceof GridNearTxPrepareResponse) { - GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0; + @Override public boolean apply(Event evt) { + latch.countDown(); - GridCacheVersion txId = res.version(); - - TX_IDS.add(txId); - } + try { + latch.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); } - super.sendMessage(node, msg, ackC); + return true; } } - }
