Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 62dbb11d5 -> 6d747761e


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d747761
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d747761
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d747761

Branch: refs/heads/ignite-3478
Commit: 6d747761eeb3c4c8d29e42d61bb0e0ffa2fdac10
Parents: 62dbb11
Author: sboikov <[email protected]>
Authored: Mon Sep 18 13:11:00 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Sep 18 15:39:34 2017 +0300

----------------------------------------------------------------------
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 210 ++++++++++++++++++-
 1 file changed, 204 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6d747761/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 002da40..99ce163 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -113,9 +114,12 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        verifyCoordinatorInternalState();
-
-        stopAllGrids();
+        try {
+            verifyCoordinatorInternalState();
+        }
+        finally {
+            stopAllGrids();
+        }
     }
 
     /**
@@ -1124,6 +1128,164 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed
+     */
+    public void testOperationsSequenceConsistency_SingleNode() throws 
Exception {
+        operationsSequenceConsistency(1, 0, 0, 64);
+    }
+
+    /**
+     * TODO IGNITE-3478: enable when scan is fully implemented.
+     *
+     * @throws Exception If failed
+     */
+//    public void testOperationsSequenceConsistency_ClientServer_Backups0() 
throws Exception {
+//        operationsSequenceConsistency(4, 2, 0, 64);
+//    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param clients Number of client nodes.
+     * @param cacheBackups Number of cache backups.
+     * @param cacheParts Number of cache partitions.
+     * @throws Exception If failed.
+     */
+    private void operationsSequenceConsistency(
+        final int srvs,
+        final int clients,
+        int cacheBackups,
+        int cacheParts
+    )
+        throws Exception
+    {
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final long time = 10_000;
+
+        final AtomicInteger keyCntr = new AtomicInteger();
+
+        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        IgniteCache<Integer, Value> cache = 
randomCache(caches, rnd);
+                        IgniteTransactions txs = 
cache.unwrap(Ignite.class).transactions();
+
+                        Integer key = keyCntr.incrementAndGet();
+
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                            cache.put(key, new Value(idx, cnt++));
+
+                            tx.commit();
+                        }
+
+                        if (key > 1_000_000)
+                            break;
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<IgniteCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (!stop.get()) {
+                        IgniteCache<Integer, Value> cache = 
randomCache(caches, rnd);
+
+                        Map<Integer, TreeSet<Integer>> vals = new HashMap<>();
+
+                        for (IgniteCache.Entry<Integer, Value> e : cache) {
+                            Value val = e.getValue();
+
+                            assertNotNull(val);
+
+                            TreeSet<Integer> cntrs = vals.get(val.key);
+
+                            if (cntrs == null)
+                                vals.put(val.key, cntrs = new TreeSet<>());
+
+                            boolean add = cntrs.add(val.cnt);
+
+                            assertTrue(add);
+                        }
+
+                        for (TreeSet<Integer> readCntrs : vals.values()) {
+                            for (int i = 0; i < readCntrs.size(); i++)
+                                assertTrue(readCntrs.contains(i));
+                        }
+                    }
+                }
+            };
+
+        readWriteTest(srvs,
+            clients,
+            cacheBackups,
+            cacheParts,
+            writers,
+            readers,
+            time,
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testActiveQueryCleanupOnNodeFailure() throws Exception {
+        testSpi = true;
+
+        final Ignite srv = startGrid(0);
+
+        srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1024));
+
+        client = true;
+
+        final Ignite client = startGrid(1);
+
+        TestRecordingCommunicationSpi srvSpi = 
TestRecordingCommunicationSpi.spi(srv);
+
+        srvSpi.blockMessages(GridNearGetResponse.class, 
getTestIgniteInstanceName(1));
+
+        
TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class,
+            getTestIgniteInstanceName(0));
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                IgniteCache cache = client.cache(DEFAULT_CACHE_NAME);
+
+                cache.getAll(F.asSet(1, 2, 3));
+
+                return null;
+            }
+        });
+
+        srvSpi.waitForBlocked();
+
+        assertFalse(fut.isDone());
+
+        stopGrid(1);
+
+        verifyCoordinatorInternalState();
+
+        try {
+            fut.get();
+        }
+        catch (Exception ignore) {
+            // No-op.
+        }
+    }
+
+    /**
      * @param N Number of object to update in single transaction.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
@@ -1450,11 +1612,11 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     *
+     * @throws Exception If failed.
      */
-    private void verifyCoordinatorInternalState() {
+    private void verifyCoordinatorInternalState() throws Exception {
         for (Ignite node : G.allGrids()) {
-            CacheCoordinatorsSharedManager crd = 
((IgniteKernal)node).context().cache().context().coordinators();
+            final CacheCoordinatorsSharedManager crd = 
((IgniteKernal)node).context().cache().context().coordinators();
 
             Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs");
 
@@ -1467,6 +1629,17 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             Map ackFuts = GridTestUtils.getFieldValue(crd, "ackFuts");
 
             assertTrue(ackFuts.isEmpty());
+
+            // TODO IGNITE-3478
+//            assertTrue(GridTestUtils.waitForCondition(
+//                new GridAbsPredicate() {
+//                    @Override public boolean apply() {
+//                        Map activeQrys = GridTestUtils.getFieldValue(crd, 
"activeQueries");
+//
+//                        return activeQrys.isEmpty();
+//                    }
+//                }, 5000)
+//            );
         }
     }
 
@@ -1509,6 +1682,31 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      *
      */
+    static class Value {
+        /** */
+        int key;
+
+        /** */
+        int cnt;
+
+        /**
+         * @param key Key.
+         * @param cnt Update count.
+         */
+        Value(int key, int cnt) {
+            this.key = key;
+            this.cnt = cnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Value.class, this);
+        }
+    }
+
+    /**
+     *
+     */
     enum ReadMode {
         /** */
         GET_ALL,

Reply via email to