This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 056189cbde0 IGNITE-23138 Fix flaky TxWithKeyContentionSelfTest (Cache 
12 group) (#11512)
056189cbde0 is described below

commit 056189cbde0688cd48106ac9b7ddfef2413ee08b
Author: Semyon Zikunov <[email protected]>
AuthorDate: Mon Nov 18 19:31:01 2024 +1000

    IGNITE-23138 Fix flaky TxWithKeyContentionSelfTest (Cache 12 group) (#11512)
---
 .../transactions/TxWithKeyContentionSelfTest.java  | 208 ++++++---------------
 1 file changed, 60 insertions(+), 148 deletions(-)

diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
index fe2278918e2..4d66bb3a58d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java
@@ -17,17 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -35,21 +32,16 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -59,21 +51,14 @@ import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
 
 /** Tests tx key contention detection functional. */
 public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest {
-    /** Client flag. */
-    private boolean client;
-
-    /** Near cache flag. */
-    private boolean nearCache;
+    /** */
+    @Rule
+    public TestName testName = new TestName();
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(name);
 
-        cfg.setConsistentId("NODE_" + name.substring(name.length() - 1));
-
-        if (client)
-            cfg.setClientMode(true);
-
         cfg.setDataStorageConfiguration(
             new DataStorageConfiguration()
                 .setDefaultDataRegionConfiguration(
@@ -82,24 +67,12 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
                 )
         );
 
-        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();
-
-        cfg.setCommunicationSpi(commSpi);
-
-        cfg.setCacheConfiguration(getCacheConfiguration(DEFAULT_CACHE_NAME));
-
-        if (client) {
-            cfg.setConsistentId("Client");
-
-            cfg.setClientMode(client);
-        }
-
         return cfg;
     }
 
     /** */
-    protected CacheConfiguration<?, ?> getCacheConfiguration(String name) {
-        CacheConfiguration<Object, Object> ccfg = new 
CacheConfiguration<>(name)
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(boolean 
nearCache) {
+        CacheConfiguration<Integer, Integer> ccfg = new 
CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
             .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
             
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
             .setAffinity(new RendezvousAffinityFunction(false, 16))
@@ -134,7 +107,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testPessimisticRepeatableReadCheckContentionTxMetric() throws 
Exception {
-        runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
+        runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, false);
     }
 
     /**
@@ -145,9 +118,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testPessimisticRepeatableReadCheckContentionTxMetricNear() 
throws Exception {
-        nearCache = true;
-
-        runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ);
+        runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ, true);
     }
 
     /**
@@ -156,7 +127,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testPessimisticReadCommitedCheckContentionTxMetric() throws 
Exception {
-        runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
+        runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, false);
     }
 
     /**
@@ -165,9 +136,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testPessimisticReadCommitedCheckContentionTxMetricNear() 
throws Exception {
-        nearCache = true;
-
-        runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED);
+        runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED, true);
     }
 
     /**
@@ -176,7 +145,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testOptimisticReadCommittedCheckContentionTxMetric() throws 
Exception {
-        runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
+        runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, false);
     }
 
     /**
@@ -185,9 +154,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testOptimisticReadCommittedCheckContentionTxMetricNear() 
throws Exception {
-        nearCache = true;
-
-        runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED);
+        runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED, true);
     }
 
     /**
@@ -196,7 +163,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testOptimisticRepeatableReadCheckContentionTxMetric() throws 
Exception {
-        runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
+        runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, false);
     }
 
     /**
@@ -205,9 +172,7 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
     @Test
     @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = 
"30000")
     public void testOptimisticRepeatableReadCheckContentionTxMetricNear() 
throws Exception {
-        nearCache = true;
-
-        runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ);
+        runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ, true);
     }
 
     /** Tests metric correct results while tx collisions occured.
@@ -216,120 +181,67 @@ public class TxWithKeyContentionSelfTest extends 
GridCommonAbstractTest {
      * @param isolation Isolation level.
      * @throws Exception If failed.
      */
-    private void runKeyCollisionsMetric(TransactionConcurrency concurrency, 
TransactionIsolation isolation) throws Exception {
+    private void runKeyCollisionsMetric(TransactionConcurrency concurrency, 
TransactionIsolation isolation, boolean nearCache)
+            throws Exception {
         Ignite ig = startGridsMultiThreaded(3);
 
-        int contCnt = (int)U.staticField(IgniteTxManager.class, 
"COLLISIONS_QUEUE_THRESHOLD") * 5;
-
-        CountDownLatch txLatch = new CountDownLatch(contCnt);
-
-        ig.cluster().state(ClusterState.ACTIVE);
-
-        client = true;
-
-        Ignite cl = startGrid();
-
-        IgniteTransactions cliTxMgr = cl.transactions();
+        int contCnt = (int)U.staticField(IgniteTxManager.class, 
"COLLISIONS_QUEUE_THRESHOLD") * 2;
 
-        IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+        Ignite cl = startClientGrid();
 
-        IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME);
+        IgniteCache<Integer, Integer> clientCache = 
cl.createCache(cacheConfiguration(nearCache));
 
-        final Integer keyId = primaryKey(cache);
+        final Integer keyId = primaryKey(ig.cache(DEFAULT_CACHE_NAME));
 
-        CountDownLatch blockOnce = new CountDownLatch(1);
+        IgniteTransactions transactions = cl.transactions();
 
-        for (Ignite ig0 : G.allGrids()) {
-            if (ig0.configuration().isClientMode())
-                continue;
+        assertFalse(checkMetrics(ig));
 
-            TestRecordingCommunicationSpi commSpi0 =
-                
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
+        AtomicBoolean doTest = new AtomicBoolean(true);
 
-            commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, 
Message>() {
-                @Override public boolean apply(ClusterNode node, Message msg) {
-                    if (msg instanceof GridNearTxFinishResponse && 
blockOnce.getCount() > 0) {
-                        blockOnce.countDown();
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(
+            () -> {
+                while (doTest.get()) {
+                    try (Transaction tx = transactions.txStart(concurrency, 
isolation)) {
+                        clientCache.put(keyId, 0);
 
-                        return true;
+                        tx.commit();
                     }
-
-                    return false;
-                }
-            });
-        }
-
-        IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
-            try (Transaction tx = cliTxMgr.txStart(concurrency, isolation)) {
-                cache0.put(keyId, 0);
-                tx.commit();
-            }
-        });
-
-        blockOnce.await();
-
-        GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>();
-
-        for (int i = 0; i < contCnt; ++i) {
-            IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
-                try (Transaction tx = cliTxMgr.txStart(concurrency, 
isolation)) {
-                    cache0.put(keyId, 0);
-
-                    tx.commit();
-
-                    txLatch.countDown();
                 }
-            });
-
-            finishFut.add(f0);
+            },
+            contCnt,
+            "txThread-" + testName.getMethodName());
+
+        try {
+            assertTrue(GridTestUtils.waitForCondition(
+                () -> checkMetrics(ig),
+                getTestTimeout()));
         }
-
-        finishFut.markInitialized();
-
-        for (Ignite ig0 : G.allGrids()) {
-            TestRecordingCommunicationSpi commSpi0 =
-                
(TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi();
-
-            if (ig0.configuration().isClientMode())
-                continue;
-
-            commSpi0.stopBlock();
+        finally {
+            doTest.set(false);
+            fut.get(getTestTimeout());
         }
+    }
 
+    /**
+     * Checks if the transaction collision metrics contain the string 
"queueSize" for the given Ignite instance.
+     *
+     * @param ig Ignite instance.
+     * @return {@code true} if the metrics contain "queueSize"; otherwise 
{@code false}.
+     */
+    private static boolean checkMetrics(Ignite ig) {
         IgniteTxManager srvTxMgr = 
((IgniteEx)ig).context().cache().context().tm();
 
-        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    U.invoke(IgniteTxManager.class, srvTxMgr, 
"collectTxCollisionsInfo");
-                }
-                catch (IgniteCheckedException e) {
-                    fail(e.toString());
-                }
-
-                CacheMetrics metrics = 
ig.cache(DEFAULT_CACHE_NAME).localMetrics();
-
-                String coll1 = metrics.getTxKeyCollisions();
-
-                if (!coll1.isEmpty()) {
-                    String coll2 = metrics.getTxKeyCollisions();
-
-                    // check idempotent
-                    assertEquals(coll1, coll2);
-
-                    assertTrue(coll1.contains("queueSize"));
-
-                    return true;
-                }
-                else
-                    return false;
-            }
-        }, 10_000));
-
-        f.get();
-
-        finishFut.get();
+        try {
+            U.invoke(IgniteTxManager.class, srvTxMgr, 
"collectTxCollisionsInfo");
+        }
+        catch (IgniteCheckedException e) {
+            fail(e.toString());
+        }
 
-        txLatch.await();
+        return ig.cache(DEFAULT_CACHE_NAME)
+            .localMetrics()
+            .getTxKeyCollisions()
+            .contains("queueSize");
     }
 }

Reply via email to