Repository: ignite Updated Branches: refs/heads/ignite-3478 3d1616141 -> 4c4603c06
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c4603c0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c4603c0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c4603c0 Branch: refs/heads/ignite-3478 Commit: 4c4603c0605a73a5616702e8f3e0fa38d6d98846 Parents: 3d16161 Author: sboikov <[email protected]> Authored: Wed Sep 20 16:17:14 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 20 16:17:14 2017 +0300 ---------------------------------------------------------------------- .../cache/mvcc/CacheMvccTransactionsTest.java | 94 ++++++++++++++++++++ 1 file changed, 94 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4c4603c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 28511cf..cf3bafb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; @@ -1418,6 +1419,99 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testReadInProgressCoordinatorFails() throws Exception { + testSpi = true; + + startGrids(4); + + client = true; + + final Ignite client = startGrid(4); + + final IgniteCache cache = client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), getTestIgniteInstanceName(1)))); + + final Set<Integer> keys = new HashSet<>(); + + List<Integer> keys1 = primaryKeys(jcache(2), 10); + + keys.addAll(keys1); + keys.addAll(primaryKeys(jcache(3), 10)); + + Map<Integer, Integer> vals = new HashMap(); + + for (Integer key : keys) + vals.put(key, -1); + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + + final TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client); + + clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridNearGetRequest; + } + }); + + IgniteInternalFuture getFut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + Map<Integer, Integer> res = cache.getAll(keys); + + assertEquals(20, res.size()); + + Integer val = null; + + for (Integer val0 : res.values()) { + assertNotNull(val0); + + if (val == null) + val = val0; + else + assertEquals(val, val0); + } + + return null; + } + }, "get-thread"); + + clientSpi.waitForBlocked(); + + final IgniteInternalFuture releaseWaitFut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + Thread.sleep(3000); + + clientSpi.stopBlock(true); + + return null; + } + }, "get-thread"); + + stopGrid(0); + + for (int i = 0; i < 10; i++) { + vals = new HashMap(); + + for (Integer key : keys) + vals.put(key, i); + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + } + + releaseWaitFut.get(); + getFut.get(); + } + + /** * @param N Number of object to update in single transaction. * @param srvs Number of server nodes. * @param clients Number of client nodes.
