Repository: ignite
Updated Branches:
  refs/heads/master 6a09c4e20 -> b7a0adc71


IGNITE-8509 Fixed and reworkd tx rollback tests - Fixes #4150.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7a0adc7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7a0adc7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7a0adc7

Branch: refs/heads/master
Commit: b7a0adc711bb28e7d23f5392bbd588c666cedc22
Parents: 6a09c4e
Author: Aleksei Scherbakov <[email protected]>
Authored: Mon Sep 10 14:33:40 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Mon Sep 10 14:33:40 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |   1 -
 .../cache/transactions/TxRollbackAsyncTest.java | 255 ++++++++++---------
 2 files changed, 139 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7a0adc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 1e25c93..2e66e5b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -611,7 +611,6 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         }
     }
 
-
     /**
      * @param cacheMsg Cache message.
      * @param nodeId Node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7a0adc7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
index 7968be3..4ca8ba3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackAsyncTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,7 +24,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,6 +49,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
@@ -55,6 +59,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -81,6 +87,7 @@ import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionRollbackException;
 
+import static java.lang.Thread.interrupted;
 import static java.lang.Thread.yield;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -344,13 +351,12 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
 
         CountDownLatch waitCommit = new CountDownLatch(1);
 
+        // Used for passing tx instance to rollback thread.
         IgniteInternalFuture<?> lockFut = lockInTx(holdLockNode, keyLocked, 
waitCommit, 0);
 
         U.awaitQuiet(keyLocked);
 
-        final CountDownLatch rollbackLatch = new CountDownLatch(1);
-
-        final int txCnt = 10000;
+        final int txCnt = 1000;
 
         final IgniteKernal k = (IgniteKernal)tryLockNode;
 
@@ -358,7 +364,16 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
 
         final GridCacheContext<Object, Object> cctx = 
ctx.cacheContext(CU.cacheId(CACHE_NAME));
 
-        final AtomicBoolean stop = new AtomicBoolean();
+        GridFutureAdapter<Transaction> txReadyFut = new GridFutureAdapter<>();
+
+        long seed = System.currentTimeMillis();
+
+        Random r = new Random(seed);
+
+        log.info("Running: node0=" + 
holdLockNode.cluster().localNode().consistentId() +
+            ", node1=" + tryLockNode.cluster().localNode().consistentId() +
+            ", useTimeout=" + useTimeout +
+            ", seed=" + seed);
 
         IgniteInternalFuture<?> txFut = multithreadedAsync(new Runnable() {
             @Override public void run() {
@@ -369,10 +384,10 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
 
                     assertTrue(tx0 == null || tx0.state() == ROLLED_BACK);
 
-                    rollbackLatch.countDown();
-
                     try (Transaction tx = 
tryLockNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ,
-                        useTimeout ? 500 : 0, 1)) {
+                        useTimeout ? 50 : 0, 1)) {
+
+                        txReadyFut.onDone(tx);
 
                         // Will block on lock request until rolled back 
asynchronously.
                         Object o = tryLockNode.cache(CACHE_NAME).get(0);
@@ -384,29 +399,30 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
                     }
                 }
 
-                stop.set(true);
+                txReadyFut.onDone((Transaction)null);
             }
         }, 1, "tx-get-thread");
 
         IgniteInternalFuture<?> rollbackFut = multithreadedAsync(new 
Runnable() {
             @Override public void run() {
-                U.awaitQuiet(rollbackLatch);
-
-                doSleep(50);
-
                 Set<IgniteUuid> rolledBackVers = new HashSet<>();
 
                 int proc = 1;
 
-                while(!stop.get()) {
-                    for (Transaction tx : 
tryLockNode.transactions().localActiveTransactions()) {
+                while(true) {
+                    try {
+                        Transaction tx = txReadyFut.get();
+
+                        txReadyFut.reset();
+
+                        if (tx == null)
+                            break;
+
+                        doSleep(r.nextInt(15)); // Wait a bit to reduce chance 
of rolling back empty transactions.
+
                         if (rolledBackVers.contains(tx.xid()))
                             fail("Rollback version is expected");
 
-                        // Skip write transaction.
-                        if (LABEL.equals(tx.label()))
-                            continue;
-
                         try {
                             if (proc % 2 == 0)
                                 tx.rollback();
@@ -419,14 +435,15 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
 
                         rolledBackVers.add(tx.xid());
 
-                        if (proc % 1000 == 0)
+                        if (proc % 100 == 0)
                             log.info("Rolled back: " + proc);
 
                         proc++;
                     }
+                    catch (IgniteCheckedException e) {
+                        fail(e.getMessage());
+                    }
                 }
-
-                assertEquals("Unexpected size", txCnt, rolledBackVers.size());
             }
         }, 1, "tx-rollback-thread");
 
@@ -613,8 +630,6 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
      *
      */
     public void testMixedAsyncRollbackTypes() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-8509";);
-
         final Ignite client = startClient();
 
         final AtomicBoolean stop = new AtomicBoolean();
@@ -640,134 +655,124 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
         final LongAdder failed = new LongAdder();
         final LongAdder rolledBack = new LongAdder();
 
-        IgniteInternalFuture<?> txFut = multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                while (!stop.get()) {
-                    int nodeId = r.nextInt(GRID_CNT + 1);
+        ConcurrentMap<Ignite, BlockingQueue<Transaction>> perNodeTxs = new 
ConcurrentHashMap<>();
 
-                    // Choose random node to start tx on.
-                    Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? 
client : grid(nodeId);
+        for (Ignite ignite : G.allGrids())
+            perNodeTxs.put(ignite, new ArrayBlockingQueue<>(1000));
 
-                    TransactionConcurrency conc = 
TC_VALS[r.nextInt(TC_VALS.length)];
-                    TransactionIsolation isolation = 
TI_VALS[r.nextInt(TI_VALS.length)];
+        IgniteInternalFuture<?> txFut = multithreadedAsync(() -> {
+            while (!stop.get()) {
+                int nodeId = r.nextInt(GRID_CNT + 1);
 
-                    long timeout = r.nextInt(50) + 50; // Timeout is necessary 
to prevent deadlocks.
+                // Choose random node to start tx on.
+                Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? 
client : grid(nodeId);
 
-                    try (Transaction tx = node.transactions().txStart(conc, 
isolation, timeout, txSize)) {
-                        int setSize = r.nextInt(txSize / 2) + 1;
+                TransactionConcurrency conc = 
TC_VALS[r.nextInt(TC_VALS.length)];
+                TransactionIsolation isolation = 
TI_VALS[r.nextInt(TI_VALS.length)];
 
-                        for (int i = 0; i < setSize; i++) {
-                            switch (r.nextInt(4)) {
-                                case 0:
-                                    
node.cache(CACHE_NAME).remove(r.nextInt(txSize));
+                // Timeout is necessary otherwise deadlock is possible due to 
randomness of lock acquisition.
+                long timeout = r.nextInt(50) + 50;
 
-                                    break;
+                try (Transaction tx = node.transactions().txStart(conc, 
isolation, timeout, txSize)) {
+                    BlockingQueue<Transaction> nodeQ = perNodeTxs.get(node);
 
-                                case 1:
-                                    
node.cache(CACHE_NAME).get(r.nextInt(txSize));
+                    nodeQ.put(tx);
 
-                                    break;
+                    int setSize = r.nextInt(txSize / 2) + 1;
 
-                                case 2:
-                                    final Integer v = 
(Integer)node.cache(CACHE_NAME).get(r.nextInt(txSize));
+                    for (int i = 0; i < setSize; i++) {
+                        switch (r.nextInt(4)) {
+                            case 0:
+                                
node.cache(CACHE_NAME).remove(r.nextInt(txSize));
 
-                                    
node.cache(CACHE_NAME).put(r.nextInt(txSize), (v == null ? 0 : v) + 1);
+                                break;
 
-                                    break;
+                            case 1:
+                                node.cache(CACHE_NAME).get(r.nextInt(txSize));
 
-                                case 3:
-                                    
node.cache(CACHE_NAME).put(r.nextInt(txSize), 0);
+                                break;
 
-                                    break;
+                            case 2:
+                                final Integer v = 
(Integer)node.cache(CACHE_NAME).get(r.nextInt(txSize));
 
-                                default:
-                                    fail("Unexpected opcode");
-                            }
-                        }
+                                node.cache(CACHE_NAME).put(r.nextInt(txSize), 
(v == null ? 0 : v) + 1);
 
-                        tx.commit();
+                                break;
 
-                        completed.add(1);
-                    }
-                    catch (Throwable e) {
-                        failed.add(1);
+                            case 3:
+                                node.cache(CACHE_NAME).put(r.nextInt(txSize), 
0);
+
+                                break;
+
+                            default:
+                                fail("Unexpected opcode");
+                        }
                     }
 
-                    total.add(1);
+                    tx.commit();
+
+                    completed.add(1);
                 }
+                catch (Throwable e) {
+                    failed.add(1);
+                }
+
+                total.add(1);
             }
         }, threadCnt, "tx-thread");
 
         final AtomicIntegerArray idx = new AtomicIntegerArray(GRID_CNT + 1);
 
-        IgniteInternalFuture<?> rollbackFut = multithreadedAsync(new 
Runnable() {
-            @Override public void run() {
-                int concurrentRollbackCnt = 5;
-
-                List<IgniteFuture<?>> futs = new 
ArrayList<>(concurrentRollbackCnt);
-
-                while (!stop.get()) {
-                    // Choose node randomly.
-                    final int nodeId = r.nextInt(GRID_CNT + 1);
-
-                    // Reserve node.
-                    if (!idx.compareAndSet(nodeId, 0, 1)) {
-                        yield();
+        CIX1<Transaction> rollbackClo = new CIX1<Transaction>() {
+            @Override public void applyx(Transaction tx) throws 
IgniteCheckedException {
+                try {
+                    IgniteFuture<Void> rollbackFut = tx.rollbackAsync();
 
-                        continue;
-                    }
+                    rollbackFut.listen(new 
IgniteInClosure<IgniteFuture<Void>>() {
+                        @Override public void apply(IgniteFuture<Void> fut) {
+                            tx.close();
+                        }
+                    });
+                }
+                catch (Throwable t) {
+                    log.error("Exception on async rollback", t);
 
-                    Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? 
client : grid(nodeId);
+                    throw new IgniteCheckedException("Rollback failed", t);
+                }
+            }
+        };
 
-                    Collection<Transaction> transactions = 
node.transactions().localActiveTransactions();
+        IgniteInternalFuture<?> rollbackFut = multithreadedAsync(() -> {
+            while (!interrupted()) {
+                // Choose node randomly.
+                final int nodeId = r.nextInt(GRID_CNT + 1);
 
-                    for (Transaction tx : transactions) {
-                        rolledBack.add(1);
+                // Reserve node for rollback.
+                if (!idx.compareAndSet(nodeId, 0, 1)) {
+                    yield();
 
-                        if (rolledBack.sum() % 1000 == 0)
-                            info("Processed: " + rolledBack.sum());
+                    continue;
+                }
 
-                        try {
-                            IgniteFuture<Void> rollbackFut = 
tx.rollbackAsync();
+                Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? 
client : grid(nodeId);
 
-                            rollbackFut.listen(new 
IgniteInClosure<IgniteFuture<Void>>() {
-                                @Override public void apply(IgniteFuture<Void> 
fut) {
-                                    tx.close();
-                                }
-                            });
+                BlockingQueue<Transaction> nodeQ = perNodeTxs.get(node);
 
-                            futs.add(rollbackFut);
-                        }
-                        catch (Throwable t) {
-                            log.error("Exception on async rollback", t);
+                Transaction tx;
 
-                            fail("Exception is not expected");
-                        }
+                // Rollback all transaction
+                while((tx = nodeQ.poll()) != null) {
+                    rolledBack.add(1);
 
-                        if (futs.size() == concurrentRollbackCnt) {
-                            for (IgniteFuture<?> fut : futs)
-                                try {
-                                    fut.get();
-                                }
-                                catch (IgniteException e) {
-                                    log.warning("Future was rolled back with 
error", e);
-                                }
+                    doSleep(r.nextInt(50)); // Add random sleep to increase 
completed txs count.
 
-                            futs.clear();
-                        }
-                    }
+                    if (rolledBack.sum() % 1000 == 0)
+                        info("Rolled back so far: " + rolledBack.sum());
 
-                    idx.set(nodeId, 0);
+                    rollbackClo.apply(tx);
                 }
 
-                for (IgniteFuture<?> fut : futs)
-                    try {
-                        fut.get();
-                    }
-                    catch (Throwable t) {
-                        // No-op.
-                    }
-
+                idx.set(nodeId, 0);
             }
         }, 3, "rollback-thread"); // Rollback by multiple threads.
 
@@ -775,9 +780,27 @@ public class TxRollbackAsyncTest extends 
GridCommonAbstractTest {
 
         stop.set(true);
 
-        txFut.get();
+        txFut.get(); // Stop tx generation.
 
-        rollbackFut.get();
+        rollbackFut.cancel();
+
+        try {
+            rollbackFut.get();
+        }
+        catch (IgniteFutureCancelledCheckedException e) {
+            // Expected.
+        }
+
+        // Rollback remaining transactions.
+        for (BlockingQueue<Transaction> queue : perNodeTxs.values()) {
+            Transaction tx;
+
+            while((tx = queue.poll()) != null) {
+                rolledBack.add(1);
+
+                rollbackClo.apply(tx);
+            }
+        }
 
         log.info("total=" + total.sum() + ", completed=" + completed.sum() + 
", failed=" + failed.sum() +
             ", rolledBack=" + rolledBack.sum());

Reply via email to