Repository: ignite Updated Branches: refs/heads/ignite-3478 7f4defd09 -> 761e43d30
http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 6b01aef..1b70747 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -55,11 +56,15 @@ import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -68,6 +73,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -79,6 +85,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA /** * TODO IGNITE-3478: extend tests to use single/mutiple nodes, all tx types. * TODO IGNITE-3478: test with cache groups. + * TODO IGNITE-3478: add check for cleanup in all test (at the and do update for all keys, check there are 2 versions left). */ @SuppressWarnings("unchecked") public class CacheMvccTransactionsTest extends GridCommonAbstractTest { @@ -89,6 +96,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { private static final int DFLT_PARTITION_COUNT = RendezvousAffinityFunction.DFLT_PARTITION_COUNT; /** */ + private static final String CRD_ATTR = "testCrd"; + + /** */ private static final long DFLT_TEST_TIME = 30_000; /** */ @@ -100,6 +110,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** */ private boolean testSpi; + /** */ + private String nodeAttr; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -115,6 +128,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { cfg.setClientMode(client); + if (nodeAttr != null) + cfg.setUserAttributes(F.asMap(nodeAttr, true)); + return cfg; } @@ -124,6 +140,13 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + CacheCoordinatorsProcessor.coordinatorAssignClosure(null); + } + + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { try { verifyCoordinatorInternalState(); @@ -131,6 +154,10 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { finally { stopAllGrids(); } + + CacheCoordinatorsProcessor.coordinatorAssignClosure(null); + + super.afterTest(); } /** @@ -491,7 +518,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { client = true; - final Ignite ignite = startGrid(3); + final Ignite ignite = startGrid(2); awaitPartitionMapExchange(); @@ -549,7 +576,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { if (i % 2 == 1) { // Execute one more update to increase counter. try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(1000_0000, 1); + cache.put(primaryKeys(jcache(0), 1, 100_000).get(0), 1); tx.commit(); } @@ -956,38 +983,46 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode() throws Exception { - putAllGetAll(1, 0, 0, 64); + putAllGetAll(false, 1, 0, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode_SinglePartition() throws Exception { - putAllGetAll(1, 0, 0, 1); + putAllGetAll(false, 1, 0, 0, 1); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups0() throws Exception { - putAllGetAll(4, 2, 0, 64); + putAllGetAll(false, 4, 2, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups1() throws Exception { - putAllGetAll(4, 2, 1, 64); + putAllGetAll(false, 4, 2, 1, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups2() throws Exception { - putAllGetAll(4, 2, 2, 64); + putAllGetAll(false, 4, 2, 2, 64); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator() throws Exception { + putAllGetAll(true, 4, 2, 1, 64); } /** + * @param restartCrd Coordinator restart flag. * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -995,6 +1030,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void putAllGetAll( + boolean restartCrd, final int srvs, final int clients, int cacheBackups, @@ -1118,7 +1154,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + restartCrd, + srvs, clients, cacheBackups, cacheParts, @@ -1128,6 +1166,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { null, writer, reader); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); } /** @@ -1349,7 +1390,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + false, + srvs, clients, cacheBackups, cacheParts, @@ -1486,7 +1529,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + false, + srvs, clients, cacheBackups, cacheParts, @@ -1535,6 +1580,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { stopGrid(1); + checkActiveQueriesCleanup(ignite(0)); + verifyCoordinatorInternalState(); try { @@ -1671,43 +1718,79 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** - * TODO IGNITE-3478. - * * @throws Exception If failed. */ - public void _testReadInProgressCoordinatorFails() throws Exception { + public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception { + for (int i = 1; i <= 3; i++) { + readInProgressCoordinatorFailsSimple(false, i); + + afterTest(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception { + for (int i = 1; i <= 3; i++) { + readInProgressCoordinatorFailsSimple(true, i); + + afterTest(); + } + } + + /** + * @param fromClient {@code True} if read from client node, otherwise from server node. + * @param crdChangeCnt Number of coordinator changes. + * @throws Exception If failed. + */ + private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt) throws Exception { + info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient + ", crdChangeCnt=" + crdChangeCnt + ']'); + testSpi = true; - startGrids(4); + client = false; + + final int SRVS = 3; + final int COORDS = crdChangeCnt + 1; + + startGrids(SRVS + COORDS); client = true; - final Ignite client = startGrid(4); + assertTrue(startGrid(SRVS + COORDS).configuration().isClientMode()); - final IgniteCache cache = client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). - setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), getTestIgniteInstanceName(1)))); + final Ignite getNode = fromClient ? ignite(SRVS + COORDS) : ignite(COORDS); + + String[] excludeNodes = new String[COORDS]; + + for (int i = 0; i < COORDS; i++) + excludeNodes[i] = testNodeName(i); + + final IgniteCache cache = getNode.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setNodeFilter(new TestCacheNodeExcludingFilter(excludeNodes))); final Set<Integer> keys = new HashSet<>(); - List<Integer> keys1 = primaryKeys(jcache(2), 10); + List<Integer> keys1 = primaryKeys(jcache(COORDS), 10); keys.addAll(keys1); - keys.addAll(primaryKeys(jcache(3), 10)); + keys.addAll(primaryKeys(jcache(COORDS + 1), 10)); Map<Integer, Integer> vals = new HashMap(); for (Integer key : keys) vals.put(key, -1); - try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.putAll(vals); tx.commit(); } - final TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client); + final TestRecordingCommunicationSpi getNodeSpi = TestRecordingCommunicationSpi.spi(getNode); - clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + getNodeSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { @Override public boolean apply(ClusterNode node, Message msg) { return msg instanceof GridNearGetRequest; } @@ -1734,25 +1817,154 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }, "get-thread"); - clientSpi.waitForBlocked(); + getNodeSpi.waitForBlocked(); - final IgniteInternalFuture releaseWaitFut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - Thread.sleep(3000); + for (int i = 0; i < crdChangeCnt; i++) + stopGrid(i); - clientSpi.stopBlock(true); + for (int i = 0; i < 10; i++) { + vals = new HashMap(); + + for (Integer key : keys) + vals.put(key, i); + + try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + } + + getNodeSpi.stopBlock(true); + + getFut.get(); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); + } + + /** + * @throws Exception If failed. + */ + public void testCoordinatorChangeActiveQueryClientFails_Simple() throws Exception { + testSpi = true; + + client = false; + + final int SRVS = 3; + final int COORDS = 1; + + startGrids(SRVS + COORDS); + + client = true; + + Ignite client = startGrid(SRVS + COORDS); + + final IgniteCache cache = client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setNodeFilter(new TestCacheNodeExcludingFilter(testNodeName(0)))); + + final Map<Integer, Integer> vals = new HashMap(); + + for (int i = 0; i < 100; i++) + vals.put(i, i); + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + + final TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client); + + clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof GridNearGetRequest; + } + }); + + IgniteInternalFuture getFut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + cache.getAll(vals.keySet()); return null; } }, "get-thread"); + clientSpi.waitForBlocked(); + stopGrid(0); - for (int i = 0; i < 10; i++) { - vals = new HashMap(); + stopGrid(client.name()); - for (Integer key : keys) - vals.put(key, i); + try { + getFut.get(); + + fail(); + } + catch (Exception ignore) { + // No-op. + } + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); + } + + /** + * @throws Exception If failed. + */ + public void testReadInProgressCoordinatorFails() throws Exception { + readInProgressCoordinatorFails(false); + } + + /** + * @throws Exception If failed. + */ + public void testReadInProgressCoordinatorFails_ReadDelay() throws Exception { + readInProgressCoordinatorFails(true); + } + + /** + * @param readDelay {@code True} if delays get requests. + * @throws Exception If failed. + */ + private void readInProgressCoordinatorFails(boolean readDelay) throws Exception { + final int COORD_NODES = 5; + final int SRV_NODES = 4; + + if (readDelay) + testSpi = true; + + startGrids(COORD_NODES); + + startGridsMultiThreaded(COORD_NODES, SRV_NODES); + + client = true; + + Ignite client = startGrid(COORD_NODES + SRV_NODES); + + final List<String> cacheNames = new ArrayList<>(); + + final int KEYS = 100; + + final Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < KEYS; i++) + vals.put(i, 0); + + String[] exclude = new String[COORD_NODES]; + + for (int i = 0; i < COORD_NODES; i++) + exclude[i] = testNodeName(i); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + ccfg.setName("cache-" + cacheNames.size()); + + // First server nodes are 'dedicated' coordinators. + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude)); + + cacheNames.add(ccfg.getName()); + + IgniteCache cache = client.createCache(ccfg); try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.putAll(vals); @@ -1761,8 +1973,372 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } } - releaseWaitFut.get(); - getFut.get(); + if (readDelay) { + for (int i = COORD_NODES; i < COORD_NODES + SRV_NODES + 1; i++) { + TestRecordingCommunicationSpi.spi(ignite(i)).closure(new IgniteBiInClosure<ClusterNode, Message>() { + @Override public void apply(ClusterNode node, Message msg) { + if (msg instanceof GridNearGetRequest) + doSleep(ThreadLocalRandom.current().nextLong(50) + 1); + } + }); + } + } + + final AtomicBoolean done = new AtomicBoolean(); + + try { + final AtomicInteger readNodeIdx = new AtomicInteger(0); + + IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + Ignite node = ignite(COORD_NODES + (readNodeIdx.getAndIncrement() % (SRV_NODES + 1))); + + int cnt = 0; + + while (!done.get()) { + for (String cacheName : cacheNames) { + IgniteCache cache = node.cache(cacheName); + + Map<Integer, Integer> res = cache.getAll(vals.keySet()); + + assertEquals(vals.size(), res.size()); + + Integer val0 = null; + + for (Integer val : res.values()) { + if (val0 == null) + val0 = val; + else + assertEquals(val0, val); + } + } + + cnt++; + } + + log.info("Finished [node=" + node.name() + ", readCnt=" + cnt + ']'); + + return null; + } + catch (Throwable e) { + error("Unexpected error: " + e, e); + + throw e; + } + } + }, (SRV_NODES + 1) + 1, "get-thread"); + + IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + Ignite node = ignite(COORD_NODES); + + List<IgniteCache> caches = new ArrayList<>(); + + for (String cacheName : cacheNames) + caches.add(node.cache(cacheName)); + + Integer val = 1; + + while (!done.get()) { + Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < KEYS; i++) + vals.put(i, val); + + for (IgniteCache cache : caches) { + try { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + } + catch (ClusterTopologyException e) { + info("Tx failed: " + e); + } + } + + val++; + } + + return null; + } + }, "putAll-thread"); + + IgniteInternalFuture putFut2 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + Ignite node = ignite(COORD_NODES); + + IgniteCache cache = node.cache(cacheNames.get(0)); + + Integer val = 0; + + while (!done.get()) { + try { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(Integer.MAX_VALUE, val); + + tx.commit(); + } + } + catch (ClusterTopologyException e) { + info("Tx failed: " + e); + } + + val++; + } + + return null; + } + }, "put-thread"); + + for (int i = 0; i < COORD_NODES && !getFut.isDone(); i++) { + U.sleep(3000); + + stopGrid(i); + + awaitPartitionMapExchange(); + } + + done.set(true); + + getFut.get(); + putFut1.get(); + putFut2.get(); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); + } + finally { + done.set(true); + } + + } + + /** + * @throws Exception If failed. + */ + public void testMvccCoordinatorChangeSimple() throws Exception { + Ignite srv0 = startGrid(0); + + final List<String> cacheNames = new ArrayList<>(); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + ccfg.setName("cache-" + cacheNames.size()); + + cacheNames.add(ccfg.getName()); + + srv0.createCache(ccfg); + } + + checkPutGet(cacheNames); + + for (int i = 0; i < 3; i++) { + startGrid(i + 1); + + checkPutGet(cacheNames); + + checkCoordinatorsConsistency(null); + } + + client = true; + + for (int i = 0; i < 3; i++) { + Ignite node = startGrid(i + 4); + + // Init client caches outside of transactions. + for (String cacheName : cacheNames) + node.cache(cacheName); + + checkPutGet(cacheNames); + + checkCoordinatorsConsistency(null); + } + + for (int i = 0; i < 3; i++) { + stopGrid(i); + + awaitPartitionMapExchange(); + + checkPutGet(cacheNames); + + checkCoordinatorsConsistency(null); + } + } + + /** + * @param cacheNames Cache names. + */ + private void checkPutGet(List<String> cacheNames) { + List<Ignite> nodes = G.allGrids(); + + assertFalse(nodes.isEmpty()); + + Ignite putNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); + + Map<Integer, Integer> vals = new HashMap(); + + Integer val = ThreadLocalRandom.current().nextInt(); + + for (int i = 0; i < 10; i++) + vals.put(i, val); + + try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (String cacheName : cacheNames) + putNode.cache(cacheName).putAll(vals); + + tx.commit(); + } + + for (Ignite node : nodes) { + for (String cacheName : cacheNames) { + Map<Object, Object> res = node.cache(cacheName).getAll(vals.keySet()); + + assertEquals(vals, res); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testMvccCoordinatorInfoConsistency() throws Exception { + for (int i = 0; i < 4; i++) { + startGrid(i); + + checkCoordinatorsConsistency(i + 1); + } + + client = true; + + startGrid(4); + + checkCoordinatorsConsistency(5); + + startGrid(5); + + checkCoordinatorsConsistency(6); + + client = false; + + stopGrid(0); + + checkCoordinatorsConsistency(5); + } + + /** + * @param expNodes Expected nodes number. + */ + private void checkCoordinatorsConsistency(@Nullable Integer expNodes) { + List<Ignite> nodes = G.allGrids(); + + if (expNodes != null) + assertEquals(expNodes, (Integer)nodes.size()); + + MvccCoordinator crd = null; + + for (Ignite node : G.allGrids()) { + CacheCoordinatorsProcessor crdProc = ((IgniteKernal) node).context().cache().context().coordinators(); + + MvccCoordinator crd0 = crdProc.currentCoordinator(); + + if (crd != null) + assertEquals(crd, crd0); + else + crd = crd0; + } + } + + /** + * @throws Exception If failed. + */ + public void testGetVersionRequestFailover() throws Exception { + final int NODES = 5; + + testSpi = true; + + startGridsMultiThreaded(NODES - 1); + + client = true; + + Ignite client = startGrid(NODES - 1); + + final List<String> cacheNames = new ArrayList<>(); + + final Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < 100; i++) + vals.put(i, i); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + ccfg.setName("cache-" + cacheNames.size()); + + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0))); + + cacheNames.add(ccfg.getName()); + + IgniteCache cache = client.createCache(ccfg); + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + } + + final AtomicInteger nodeIdx = new AtomicInteger(1); + + final AtomicBoolean done = new AtomicBoolean(); + + try { + IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite node = ignite(nodeIdx.getAndIncrement()); + + int cnt = 0; + + while (!done.get()) { + for (String cacheName : cacheNames) { + IgniteCache cache = node.cache(cacheName); + + Map<Integer, Integer> res = cache.getAll(vals.keySet()); + + assertEquals(vals, res); + } + + cnt++; + } + + log.info("Finished [node=" + node.name() + ", cnt=" + cnt + ']'); + + return null; + } + }, NODES - 1, "get-thread"); + + doSleep(1000); + + TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(ignite(0)); + + crdSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof MvccCoordinatorVersionResponse; + } + }); + + crdSpi.waitForBlocked(); + + stopGrid(0); + + doSleep(1000); + + done.set(true); + + getFut.get(); + } + finally { + done.set(true); + } } /** @@ -1899,7 +2475,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + false, + srvs, clients, cacheBackups, cacheParts, @@ -1925,6 +2503,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void readWriteTest( + final boolean restartCrd, final int srvs, final int clients, int cacheBackups, @@ -1935,18 +2514,36 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { IgniteInClosure<IgniteCache<Object, Object>> init, final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer, final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader) throws Exception { + if (restartCrd) + CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure()); + Ignite srv0 = startGridsMultiThreaded(srvs); if (clients > 0) { client = true; startGridsMultiThreaded(srvs, clients); + + client = false; } - IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration(PARTITIONED, + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, cacheBackups, - cacheParts)); + cacheParts); + + if (restartCrd) + ccfg.setNodeFilter(new CoordinatorNodeFilter()); + + IgniteCache<Object, Object> cache = srv0.createCache(ccfg); + + int crdIdx = srvs + clients; + + if (restartCrd) { + nodeAttr = CRD_ATTR; + + startGrid(crdIdx); + } if (init != null) init.apply(cache); @@ -1974,6 +2571,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { writer.apply(idx, caches, stop); } catch (Throwable e) { + if (restartCrd && X.hasCause(e, ClusterTopologyException.class)) { + log.info("Writer error: " + e); + + return null; + } + error("Unexpected error: " + e, e); stop.set(true); @@ -2006,9 +2609,24 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }, readers, "reader"); - while (System.currentTimeMillis() < stopTime && !stop.get()) + while (System.currentTimeMillis() < stopTime && !stop.get()) { Thread.sleep(1000); + if (restartCrd) { + log.info("Start new coordinator: " + (crdIdx + 1)); + + startGrid(crdIdx + 1); + + log.info("Stop current coordinator: " + crdIdx); + + stopGrid(crdIdx); + + crdIdx++; + + awaitPartitionMapExchange(); + } + } + stop.set(true); writeFut.get(); @@ -2095,7 +2713,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { */ private void verifyCoordinatorInternalState() throws Exception { for (Ignite node : G.allGrids()) { - final CacheCoordinatorsSharedManager crd = ((IgniteKernal)node).context().cache().context().coordinators(); + final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators(); Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs"); @@ -2119,16 +2737,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void checkActiveQueriesCleanup(Ignite node) throws Exception { - final CacheCoordinatorsSharedManager crd = ((IgniteKernal)node).context().cache().context().coordinators(); + final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators(); + + assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition( + new GridAbsPredicate() { + @Override public boolean apply() { + Object activeQueries = GridTestUtils.getFieldValue(crd, "activeQueries"); + + synchronized (activeQueries) { + Long minQry = GridTestUtils.getFieldValue(activeQueries, "minQry"); + + if (minQry != null) + log.info("Min query: " + minQry); + + Map<Object, Map> queriesMap = GridTestUtils.getFieldValue(activeQueries, "activeQueries"); + + boolean empty = true; + + for (Map.Entry<Object, Map> e : queriesMap.entrySet()) { + if (!e.getValue().isEmpty()) { + empty = false; - assertTrue(GridTestUtils.waitForCondition( + log.info("Active queries: " + e); + } + } + + return empty && minQry == null; + } + } + }, 8_000) + ); + + assertTrue("Previous coordinator queries not empty: " + node.name(), GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - Map activeQrys = GridTestUtils.getFieldValue(crd, "activeQueries"); + Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries"); + Boolean prevDone = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone"); - return activeQrys.isEmpty(); + if (!queries.isEmpty() || !prevDone) + log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']'); + + return queries.isEmpty(); } - }, 5000) + }, 8_000) ); } @@ -2203,4 +2854,31 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** */ SCAN } + + /** + * + */ + static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return node.attribute(CRD_ATTR) == null; + } + } + + /** + * + */ + static class CoordinatorAssignClosure implements IgniteClosure<Collection<ClusterNode>, ClusterNode> { + @Override public ClusterNode apply(Collection<ClusterNode> clusterNodes) { + for (ClusterNode node : clusterNodes) { + if (node.attribute(CRD_ATTR) != null) { + assert !CU.clientNode(node) : node; + + return node; + } + } + + return null; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 64070d1..56d09f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -51,7 +51,6 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index 5bbf575..39183b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -52,7 +52,6 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java index d16e525..a427c63 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java @@ -67,7 +67,6 @@ public class MetadataStoragePageMemoryImplTest extends MetadataStorageSelfTest{ null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index bd849b1..467ede4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -56,7 +56,6 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 37422fb..c5997fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -79,7 +79,6 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index ee43309..6a1d4f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager; @@ -65,7 +64,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { ctx, new GridCacheSharedContext<>( ctx, - new CacheCoordinatorsSharedManager(), new IgniteTxManager(), new GridCacheVersionManager(), new GridCacheMvccManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 4965d16..094d14c 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1453,6 +1453,14 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @param idx Index of the Ignite instance. + * @return Indexed Ignite instance name. + */ + protected String testNodeName(int idx) { + return getTestIgniteInstanceName(idx); + } + + /** * Parses test Ignite instance index from test Ignite instance name. * * @param testIgniteInstanceName Test Ignite instance name, returned by {@link #getTestIgniteInstanceName(int)}.
