ignite-3478

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e4494600
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e4494600
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e4494600

Branch: refs/heads/ignite-3479
Commit: e449460082f292e54060dcc7786ffa03986b3890
Parents: b72f362
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Sep 22 13:26:56 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Sep 22 13:26:56 2017 +0300

----------------------------------------------------------------------
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 121 ++++++++++++++++++-
 1 file changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e4494600/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index c50d63c..e5eb0ee 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -702,8 +702,6 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
         assertEquals(5, (Object)vals.get(key2));
     }
 
-
-
     /**
      * @throws Exception If failed.
      */
@@ -812,6 +810,125 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCleanupWaitsForGet3() throws Exception {
+        /*
+        Simulate case when coordinator assigned query version has active 
transaction,
+        query is delayed, after this active transaction finish and the same 
key is
+        updated several more times before query starts.
+         */
+        testSpi = true;
+
+        client = false;
+
+        startGrids(1);
+
+        client = true;
+
+        final Ignite client = startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        final IgniteCache<Object, Object> cache = 
client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 16));
+
+        final Integer key1 = 1;
+        final Integer key2 = 2;
+
+        for (int i = 0; i < 3; i++) {
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                cache.put(key1, i);
+                cache.put(key2, i);
+
+                tx.commit();
+            }
+        }
+
+        TestRecordingCommunicationSpi crdSpi = 
TestRecordingCommunicationSpi.spi(grid(0));
+
+        TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(client);
+
+        clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            /** */
+            private boolean blocked;
+
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (!blocked && (msg instanceof CoordinatorTxAckRequest)) {
+                    blocked = true;
+
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        final IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try (Transaction tx = 
client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key2, 3);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "put");
+
+        clientSpi.waitForBlocked();
+
+        for (int i = 0; i < 3; i++) {
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                cache.put(key1, i + 3);
+
+                tx.commit();
+            }
+        }
+
+        // Delay version for getAll.
+        crdSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            /** */
+            private boolean blocked;
+
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (!blocked && (msg instanceof 
MvccCoordinatorVersionResponse)) {
+                    blocked = true;
+
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        final IgniteInternalFuture<?> getFut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                Map<Object, Object> res = cache.getAll(F.asSet(key1, key2));
+
+                assertEquals(2, res.size());
+
+                return null;
+            }
+        }, "get");
+
+        crdSpi.waitForBlocked();
+
+        clientSpi.stopBlock(true);
+
+        putFut.get();
+
+        for (int i = 0; i < 3; i++) {
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                cache.put(key2, i + 4);
+
+                tx.commit();
+            }
+        }
+
+        crdSpi.stopBlock(true);
+
+        getFut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testPutAllGetAll_SingleNode() throws Exception {
         putAllGetAll(1, 0, 0, 64);
     }

Reply via email to