This is an automated email from the ASF dual-hosted git repository. dgovorukhin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 40f1fc9 IGNITE-10883 Fix and refactoring IgniteRebalanceOnCachesStoppingOrDestroyingTest flaky test - Fixes #5795. 40f1fc9 is described below commit 40f1fc94c4c8755e36cc55dd9f8124cc76d7b5f1 Author: DmitriyGovorukhin <dmitriy.govoruk...@gmail.com> AuthorDate: Tue Dec 25 14:01:50 2018 +0300 IGNITE-10883 Fix and refactoring IgniteRebalanceOnCachesStoppingOrDestroyingTest flaky test - Fixes #5795. Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> --- .../managers/discovery/GridDiscoveryManager.java | 2 +- .../internal/processors/cache/WalStateManager.java | 6 +- ...eRebalanceOnCachesStoppingOrDestroyingTest.java | 207 ++++++++++++--------- 3 files changed, 126 insertions(+), 89 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 556af19..6c72258 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2486,7 +2486,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheMap Map to add to. * @param cacheName Cache name. - * @param rich Node to add + * @param node Node to add */ private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode node) { List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 937f1f0..83c548e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -507,9 +507,11 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { for (Integer grpId0 : session0.disabledGrps) { CacheGroupContext grp = cctx.cache().cacheGroup(grpId0); - assert grp != null; + if (grp != null) + grp.topology().ownMoving(topVer); + else if (log.isDebugEnabled()) + log.debug("Cache group was destroyed before checkpoint finished, [grpId=" + grpId0 + ']'); - grp.topology().ownMoving(topVer); } cctx.exchange().refreshPartitions(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java index 5c7f6c0..0ef2289 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java @@ -19,16 +19,16 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -37,19 +37,15 @@ import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.MvccFeatureChecker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -82,6 +78,9 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA /** */ private static final int REBALANCE_BATCH_SIZE = 50 * 1024; + /** Number of loaded keys in each cache. */ + private static final int KEYS_SIZE = 3000; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -112,12 +111,12 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA .setDefaultTxTimeout(1000)); cfg.setDataStorageConfiguration( - new DataStorageConfiguration() - .setWalMode(WALMode.LOG_ONLY) - .setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setPersistenceEnabled(true) - .setMaxSize(100L * 1024 * 1024))); + new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(100L * 1024 * 1024))); return cfg; } @@ -126,7 +125,23 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA * */ @Test - public void testStopCachesOnDeactivation() throws Exception { + public void testStopCachesOnDeactivationFirstGroup() throws Exception { + testStopCachesOnDeactivation(GROUP_1); + } + + /** + * + */ + @Test + public void testStopCachesOnDeactivationSecondGroup() throws Exception { + testStopCachesOnDeactivation(GROUP_2); + } + + /** + * @param groupName Group name. + * @throws Exception If failed. + */ + private void testStopCachesOnDeactivation(String groupName) throws Exception { if (MvccFeatureChecker.forcedMvcc()) fail("https://issues.apache.org/jira/browse/IGNITE-10582"); @@ -137,26 +152,58 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA ig.cluster().active(true); return null; - }); + }, groupName); + } + + /** + * + */ + @Test + public void testDestroySpecificCachesInDifferentCacheGroupsFirstGroup() throws Exception { + testDestroySpecificCachesInDifferentCacheGroups(GROUP_1); } /** * */ @Test - public void testDestroySpecificCachesInDifferentCacheGroups() throws Exception { + public void testDestroySpecificCachesInDifferentCacheGroupsSecondGroup() throws Exception { + testDestroySpecificCachesInDifferentCacheGroups(GROUP_2); + } + + /** + * @param groupName Group name. + * @throws Exception If failed. + */ + private void testDestroySpecificCachesInDifferentCacheGroups(String groupName) throws Exception { performTest(ig -> { ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3)); return null; - }); + }, groupName); + } + + /** + * + */ + @Test + public void testDestroySpecificCacheAndCacheGroupFirstGroup() throws Exception { + testDestroySpecificCacheAndCacheGroup(GROUP_1); } /** * */ @Test - public void testDestroySpecificCacheAndCacheGroup() throws Exception { + public void testDestroySpecificCacheAndCacheGroupSecondGroup() throws Exception { + testDestroySpecificCacheAndCacheGroup(GROUP_2); + } + + /** + * @param groupName Group name. + * @throws Exception If failed. + */ + private void testDestroySpecificCacheAndCacheGroup(String groupName) throws Exception { if (MvccFeatureChecker.forcedMvcc()) fail("https://issues.apache.org/jira/browse/IGNITE-10582"); @@ -164,13 +211,13 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3, CACHE_4)); return null; - }); + }, groupName); } /** * @param testAction Action that trigger stop or destroy of caches. */ - private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throws Exception { + private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction, String groupName) throws Exception { IgniteEx ig0 = (IgniteEx)startGrids(2); ig0.cluster().active(true); @@ -179,13 +226,27 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA loadData(ig0); - startGrid(1); + IgniteEx ig1 = startGrid(1); + + RebalanceBlockingSPI commSpi = (RebalanceBlockingSPI)ig1.configuration().getCommunicationSpi(); - runLoad(ig0); + // Complete all futures for groups that we don't need to wait. + commSpi.resumeRebalanceFutures.forEach((k, v) -> { + if (k != CU.cacheId(groupName)) + v.onDone(); + }); + + CountDownLatch latch = commSpi.suspendRebalanceInMiddleLatch.get(CU.cacheId(groupName)); + + assert latch != null; + + // Await some middle point rebalance for group. + latch.await(); testAction.accept(ig0); - U.sleep(1000); + // Resume rebalance after action performed. + commSpi.resumeRebalanceFutures.get(CU.cacheId(groupName)).onDone(); awaitPartitionMapExchange(true, true, null, true); @@ -197,22 +258,22 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA */ private void loadData(Ignite ig) { List<CacheConfiguration> configs = Stream.of( - F.t(CACHE_1, GROUP_1), - F.t(CACHE_2, GROUP_1), - F.t(CACHE_3, GROUP_2), - F.t(CACHE_4, GROUP_2) + F.t(CACHE_1, GROUP_1), + F.t(CACHE_2, GROUP_1), + F.t(CACHE_3, GROUP_2), + F.t(CACHE_4, GROUP_2) ).map(names -> new CacheConfiguration<>(names.get1()) - .setGroupName(names.get2()) - .setRebalanceBatchSize(REBALANCE_BATCH_SIZE) - .setCacheMode(CacheMode.REPLICATED) - .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setGroupName(names.get2()) + .setRebalanceBatchSize(REBALANCE_BATCH_SIZE) + .setCacheMode(CacheMode.REPLICATED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) ).collect(Collectors.toList()); ig.getOrCreateCaches(configs); configs.forEach(cfg -> { try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) { - for (int i = 0; i < 3_000; i++) + for (int i = 0; i < KEYS_SIZE; i++) streamer.addData(i, new byte[1024]); streamer.flush(); @@ -221,69 +282,43 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA } /** - * @param ig Ignite instance. - */ - private void runLoad(Ignite ig) throws Exception{ - GridTestUtils.runMultiThreaded(new Runnable() { - @Override public void run() { - String cacheName = F.rand(CACHE_1, CACHE_2, CACHE_3, CACHE_4); - - IgniteCache cache = ig.cache(cacheName); - - for (int i = 0; i < 3_000; i++) { - int idx = ThreadLocalRandom.current().nextInt(3_000); - - while (true) { - try { - cache.put(idx, new byte[1024]); - - break; - } - catch (Exception e) { - MvccFeatureChecker.assertMvccWriteConflict(e); - } - } - } - } - }, 4, "load-thread"); - } - - /** * */ private static class RebalanceBlockingSPI extends TcpCommunicationSpi { - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { - slowDownMessage(msg); - - super.sendMessage(node, msg); - + /** */ + private final Map<Integer, GridFutureAdapter> resumeRebalanceFutures = new ConcurrentHashMap<>(); + + /** */ + private final Map<Integer, CountDownLatch> suspendRebalanceInMiddleLatch = new ConcurrentHashMap<>(); + + /** */ + RebalanceBlockingSPI() { + resumeRebalanceFutures.put(CU.cacheId(GROUP_1), new GridFutureAdapter()); + resumeRebalanceFutures.put(CU.cacheId(GROUP_2), new GridFutureAdapter()); + suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_1), new CountDownLatch(3)); + suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_2), new CountDownLatch(3)); } /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, - IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { - slowDownMessage(msg); + @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) { + if (msg instanceof GridIoMessage && + ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) { + GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message(); - super.sendMessage(node, msg, ackC); - } + CountDownLatch latch = suspendRebalanceInMiddleLatch.get(msg0.groupId()); - /** - * @param msg Message. - */ - private void slowDownMessage(Message msg) { - if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) { - int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId(); + if (latch != null) { + if (latch.getCount() > 0) + latch.countDown(); + else { + resumeRebalanceFutures.get(msg0.groupId()).listen(f -> super.notifyListener(sndId, msg, msgC)); - if (grpId == CU.cacheId(GROUP_1) || grpId == CU.cacheId(GROUP_2)) { - try { - U.sleep(50); - } - catch (IgniteInterruptedCheckedException e) { - e.printStackTrace(); + return; } } } + + super.notifyListener(sndId, msg, msgC); } } }