Repository: ignite Updated Branches: refs/heads/ignite-3479 f4f21b729 -> d86ab340d
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d86ab340 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d86ab340 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d86ab340 Branch: refs/heads/ignite-3479 Commit: d86ab340d11f27a59f97b7755a6d5f7ec5daaae7 Parents: f4f21b7 Author: sboikov <[email protected]> Authored: Fri Sep 29 14:12:56 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 29 14:12:56 2017 +0300 ---------------------------------------------------------------------- .../cache/mvcc/CacheCoordinatorsProcessor.java | 30 +++- .../cache/mvcc/CacheMvccTransactionsTest.java | 136 +++++++++++++++++-- 2 files changed, 149 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d86ab340/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index 753ee33..ac55164 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; @@ -125,6 +126,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { /** */ private CacheCoordinatorsDiscoveryData discoData = new CacheCoordinatorsDiscoveryData(null); + /** For tests only. */ + private static IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC; + /** * @param ctx Context. */ @@ -178,6 +182,15 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** + * For testing only. + * + * @param crdC Closure assigning coordinator. + */ + static void coordinatorAssignClosure(IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC) { + CacheCoordinatorsProcessor.crdC = crdC; + } + + /** * @param evtType Event type. * @param nodes Current nodes. * @param topVer Topology version. @@ -197,12 +210,19 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) { ClusterNode crdNode = null; - // Expect nodes are sorted by order. - for (ClusterNode node : nodes) { - if (!CU.clientNode(node)) { - crdNode = node; + if (crdC != null) { + crdNode = crdC.apply(nodes); - break; + log.info("Assigned coordinator using test closure: " + crd); + } + else { + // Expect nodes are sorted by order. + for (ClusterNode node : nodes) { + if (!CU.clientNode(node)) { + crdNode = node; + + break; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d86ab340/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index ac16833..1b70747 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -55,11 +56,15 @@ import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -91,6 +96,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { private static final int DFLT_PARTITION_COUNT = RendezvousAffinityFunction.DFLT_PARTITION_COUNT; /** */ + private static final String CRD_ATTR = "testCrd"; + + /** */ private static final long DFLT_TEST_TIME = 30_000; /** */ @@ -102,6 +110,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** */ private boolean testSpi; + /** */ + private String nodeAttr; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -117,6 +128,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { cfg.setClientMode(client); + if (nodeAttr != null) + cfg.setUserAttributes(F.asMap(nodeAttr, true)); + return cfg; } @@ -126,6 +140,13 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + CacheCoordinatorsProcessor.coordinatorAssignClosure(null); + } + + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { try { verifyCoordinatorInternalState(); @@ -133,6 +154,10 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { finally { stopAllGrids(); } + + CacheCoordinatorsProcessor.coordinatorAssignClosure(null); + + super.afterTest(); } /** @@ -958,38 +983,46 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode() throws Exception { - putAllGetAll(1, 0, 0, 64); + putAllGetAll(false, 1, 0, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_SingleNode_SinglePartition() throws Exception { - putAllGetAll(1, 0, 0, 1); + putAllGetAll(false, 1, 0, 0, 1); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups0() throws Exception { - putAllGetAll(4, 2, 0, 64); + putAllGetAll(false, 4, 2, 0, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups1() throws Exception { - putAllGetAll(4, 2, 1, 64); + putAllGetAll(false, 4, 2, 1, 64); } /** * @throws Exception If failed. */ public void testPutAllGetAll_ClientServer_Backups2() throws Exception { - putAllGetAll(4, 2, 2, 64); + putAllGetAll(false, 4, 2, 2, 64); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator() throws Exception { + putAllGetAll(true, 4, 2, 1, 64); } /** + * @param restartCrd Coordinator restart flag. * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -997,6 +1030,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void putAllGetAll( + boolean restartCrd, final int srvs, final int clients, int cacheBackups, @@ -1120,7 +1154,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + restartCrd, + srvs, clients, cacheBackups, cacheParts, @@ -1130,6 +1166,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { null, writer, reader); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); } /** @@ -1351,7 +1390,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + false, + srvs, clients, cacheBackups, cacheParts, @@ -1488,7 +1529,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + false, + srvs, clients, cacheBackups, cacheParts, @@ -2432,7 +2475,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }; - readWriteTest(srvs, + readWriteTest( + false, + srvs, clients, cacheBackups, cacheParts, @@ -2458,6 +2503,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void readWriteTest( + final boolean restartCrd, final int srvs, final int clients, int cacheBackups, @@ -2468,18 +2514,36 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { IgniteInClosure<IgniteCache<Object, Object>> init, final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer, final GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader) throws Exception { + if (restartCrd) + CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure()); + Ignite srv0 = startGridsMultiThreaded(srvs); if (clients > 0) { client = true; startGridsMultiThreaded(srvs, clients); + + client = false; } - IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration(PARTITIONED, + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, cacheBackups, - cacheParts)); + cacheParts); + + if (restartCrd) + ccfg.setNodeFilter(new CoordinatorNodeFilter()); + + IgniteCache<Object, Object> cache = srv0.createCache(ccfg); + + int crdIdx = srvs + clients; + + if (restartCrd) { + nodeAttr = CRD_ATTR; + + startGrid(crdIdx); + } if (init != null) init.apply(cache); @@ -2507,6 +2571,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { writer.apply(idx, caches, stop); } catch (Throwable e) { + if (restartCrd && X.hasCause(e, ClusterTopologyException.class)) { + log.info("Writer error: " + e); + + return null; + } + error("Unexpected error: " + e, e); stop.set(true); @@ -2539,9 +2609,24 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }, readers, "reader"); - while (System.currentTimeMillis() < stopTime && !stop.get()) + while (System.currentTimeMillis() < stopTime && !stop.get()) { Thread.sleep(1000); + if (restartCrd) { + log.info("Start new coordinator: " + (crdIdx + 1)); + + startGrid(crdIdx + 1); + + log.info("Stop current coordinator: " + crdIdx); + + stopGrid(crdIdx); + + crdIdx++; + + awaitPartitionMapExchange(); + } + } + stop.set(true); writeFut.get(); @@ -2769,4 +2854,31 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** */ SCAN } + + /** + * + */ + static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return node.attribute(CRD_ATTR) == null; + } + } + + /** + * + */ + static class CoordinatorAssignClosure implements IgniteClosure<Collection<ClusterNode>, ClusterNode> { + @Override public ClusterNode apply(Collection<ClusterNode> clusterNodes) { + for (ClusterNode node : clusterNodes) { + if (node.attribute(CRD_ATTR) != null) { + assert !CU.clientNode(node) : node; + + return node; + } + } + + return null; + } + } }
