This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 40f1fc9  IGNITE-10883 Fix and refactoring 
IgniteRebalanceOnCachesStoppingOrDestroyingTest flaky test - Fixes #5795.
40f1fc9 is described below

commit 40f1fc94c4c8755e36cc55dd9f8124cc76d7b5f1
Author: DmitriyGovorukhin <dmitriy.govoruk...@gmail.com>
AuthorDate: Tue Dec 25 14:01:50 2018 +0300

    IGNITE-10883 Fix and refactoring 
IgniteRebalanceOnCachesStoppingOrDestroyingTest flaky test - Fixes #5795.
    
    Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
---
 .../managers/discovery/GridDiscoveryManager.java   |   2 +-
 .../internal/processors/cache/WalStateManager.java |   6 +-
 ...eRebalanceOnCachesStoppingOrDestroyingTest.java | 207 ++++++++++++---------
 3 files changed, 126 insertions(+), 89 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 556af19..6c72258 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2486,7 +2486,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      *
      * @param cacheMap Map to add to.
      * @param cacheName Cache name.
-     * @param rich Node to add
+     * @param node Node to add
      */
     private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String 
cacheName, ClusterNode node) {
         List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 937f1f0..83c548e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -507,9 +507,11 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
                     for (Integer grpId0 : session0.disabledGrps) {
                         CacheGroupContext grp = 
cctx.cache().cacheGroup(grpId0);
 
-                        assert grp != null;
+                        if (grp != null)
+                            grp.topology().ownMoving(topVer);
+                        else if (log.isDebugEnabled())
+                            log.debug("Cache group was destroyed before 
checkpoint finished, [grpId=" + grpId0 + ']');
 
-                        grp.topology().ownMoving(topVer);
                     }
 
                     cctx.exchange().refreshPartitions();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
index 5c7f6c0..0ef2289 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
@@ -19,16 +19,16 @@ package 
org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -37,19 +37,15 @@ import 
org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.MvccFeatureChecker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -82,6 +78,9 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest 
extends GridCommonA
     /** */
     private static final int REBALANCE_BATCH_SIZE = 50 * 1024;
 
+    /** Number of loaded keys in each cache. */
+    private static final int KEYS_SIZE = 3000;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -112,12 +111,12 @@ public class 
IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
             .setDefaultTxTimeout(1000));
 
         cfg.setDataStorageConfiguration(
-                new DataStorageConfiguration()
-                        .setWalMode(WALMode.LOG_ONLY)
-                        .setDefaultDataRegionConfiguration(
-                                new DataRegionConfiguration()
-                                        .setPersistenceEnabled(true)
-                                        .setMaxSize(100L * 1024 * 1024)));
+            new DataStorageConfiguration()
+                .setWalMode(WALMode.LOG_ONLY)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(100L * 1024 * 1024)));
 
         return cfg;
     }
@@ -126,7 +125,23 @@ public class 
IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
      *
      */
     @Test
-    public void testStopCachesOnDeactivation() throws Exception {
+    public void testStopCachesOnDeactivationFirstGroup() throws Exception {
+        testStopCachesOnDeactivation(GROUP_1);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testStopCachesOnDeactivationSecondGroup() throws Exception {
+        testStopCachesOnDeactivation(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testStopCachesOnDeactivation(String groupName) throws 
Exception {
         if (MvccFeatureChecker.forcedMvcc())
             fail("https://issues.apache.org/jira/browse/IGNITE-10582";);
 
@@ -137,26 +152,58 @@ public class 
IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
             ig.cluster().active(true);
 
             return null;
-        });
+        }, groupName);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroySpecificCachesInDifferentCacheGroupsFirstGroup() 
throws Exception {
+        testDestroySpecificCachesInDifferentCacheGroups(GROUP_1);
     }
 
     /**
      *
      */
     @Test
-    public void testDestroySpecificCachesInDifferentCacheGroups() throws 
Exception {
+    public void testDestroySpecificCachesInDifferentCacheGroupsSecondGroup() 
throws Exception {
+        testDestroySpecificCachesInDifferentCacheGroups(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testDestroySpecificCachesInDifferentCacheGroups(String 
groupName) throws Exception {
         performTest(ig -> {
             ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3));
 
             return null;
-        });
+        }, groupName);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroySpecificCacheAndCacheGroupFirstGroup() throws 
Exception {
+        testDestroySpecificCacheAndCacheGroup(GROUP_1);
     }
 
     /**
      *
      */
     @Test
-    public void testDestroySpecificCacheAndCacheGroup() throws Exception {
+    public void testDestroySpecificCacheAndCacheGroupSecondGroup() throws 
Exception {
+        testDestroySpecificCacheAndCacheGroup(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testDestroySpecificCacheAndCacheGroup(String groupName) 
throws Exception {
         if (MvccFeatureChecker.forcedMvcc())
             fail("https://issues.apache.org/jira/browse/IGNITE-10582";);
 
@@ -164,13 +211,13 @@ public class 
IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
             ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3, CACHE_4));
 
             return null;
-        });
+        }, groupName);
     }
 
     /**
      * @param testAction Action that trigger stop or destroy of caches.
      */
-    private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) 
throws Exception {
+    private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction, 
String groupName) throws Exception {
         IgniteEx ig0 = (IgniteEx)startGrids(2);
 
         ig0.cluster().active(true);
@@ -179,13 +226,27 @@ public class 
IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
 
         loadData(ig0);
 
-        startGrid(1);
+        IgniteEx ig1 = startGrid(1);
+
+        RebalanceBlockingSPI commSpi = 
(RebalanceBlockingSPI)ig1.configuration().getCommunicationSpi();
 
-        runLoad(ig0);
+        // Complete all futures for groups that we don't need to wait.
+        commSpi.resumeRebalanceFutures.forEach((k, v) -> {
+            if (k != CU.cacheId(groupName))
+                v.onDone();
+        });
+
+        CountDownLatch latch = 
commSpi.suspendRebalanceInMiddleLatch.get(CU.cacheId(groupName));
+
+        assert latch != null;
+
+        // Await some middle point rebalance for group.
+        latch.await();
 
         testAction.accept(ig0);
 
-        U.sleep(1000);
+        // Resume rebalance after action performed.
+        commSpi.resumeRebalanceFutures.get(CU.cacheId(groupName)).onDone();
 
         awaitPartitionMapExchange(true, true, null, true);
 
@@ -197,22 +258,22 @@ public class 
IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
      */
     private void loadData(Ignite ig) {
         List<CacheConfiguration> configs = Stream.of(
-                F.t(CACHE_1, GROUP_1),
-                F.t(CACHE_2, GROUP_1),
-                F.t(CACHE_3, GROUP_2),
-                F.t(CACHE_4, GROUP_2)
+            F.t(CACHE_1, GROUP_1),
+            F.t(CACHE_2, GROUP_1),
+            F.t(CACHE_3, GROUP_2),
+            F.t(CACHE_4, GROUP_2)
         ).map(names -> new CacheConfiguration<>(names.get1())
-                .setGroupName(names.get2())
-                .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
-                .setCacheMode(CacheMode.REPLICATED)
-                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setGroupName(names.get2())
+            .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
+            .setCacheMode(CacheMode.REPLICATED)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
         ).collect(Collectors.toList());
 
         ig.getOrCreateCaches(configs);
 
         configs.forEach(cfg -> {
             try (IgniteDataStreamer<Object, Object> streamer = 
ig.dataStreamer(cfg.getName())) {
-                for (int i = 0; i < 3_000; i++)
+                for (int i = 0; i < KEYS_SIZE; i++)
                     streamer.addData(i, new byte[1024]);
 
                 streamer.flush();
@@ -221,69 +282,43 @@ public class 
IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
     }
 
     /**
-     * @param ig Ignite instance.
-     */
-    private void runLoad(Ignite ig) throws Exception{
-        GridTestUtils.runMultiThreaded(new Runnable() {
-            @Override public void run() {
-                String cacheName = F.rand(CACHE_1, CACHE_2, CACHE_3, CACHE_4);
-
-                IgniteCache cache = ig.cache(cacheName);
-
-                for (int i = 0; i < 3_000; i++) {
-                    int idx = ThreadLocalRandom.current().nextInt(3_000);
-
-                    while (true) {
-                        try {
-                            cache.put(idx, new byte[1024]);
-
-                            break;
-                        }
-                        catch (Exception e) {
-                            MvccFeatureChecker.assertMvccWriteConflict(e);
-                        }
-                    }
-                }
-            }
-        }, 4, "load-thread");
-    }
-
-    /**
      *
      */
     private static class RebalanceBlockingSPI extends TcpCommunicationSpi {
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg) 
throws IgniteSpiException {
-            slowDownMessage(msg);
-
-            super.sendMessage(node, msg);
-
+        /** */
+        private final Map<Integer, GridFutureAdapter> resumeRebalanceFutures = 
new ConcurrentHashMap<>();
+
+        /** */
+        private final Map<Integer, CountDownLatch> 
suspendRebalanceInMiddleLatch = new ConcurrentHashMap<>();
+
+        /** */
+        RebalanceBlockingSPI() {
+            resumeRebalanceFutures.put(CU.cacheId(GROUP_1), new 
GridFutureAdapter());
+            resumeRebalanceFutures.put(CU.cacheId(GROUP_2), new 
GridFutureAdapter());
+            suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_1), new 
CountDownLatch(3));
+            suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_2), new 
CountDownLatch(3));
         }
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg,
-                                          IgniteInClosure<IgniteException> 
ackC) throws IgniteSpiException {
-            slowDownMessage(msg);
+        @Override protected void notifyListener(UUID sndId, Message msg, 
IgniteRunnable msgC) {
+            if (msg instanceof GridIoMessage &&
+                ((GridIoMessage)msg).message() instanceof 
GridDhtPartitionSupplyMessage) {
+                GridDhtPartitionSupplyMessage msg0 = 
(GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message();
 
-            super.sendMessage(node, msg, ackC);
-        }
+                CountDownLatch latch = 
suspendRebalanceInMiddleLatch.get(msg0.groupId());
 
-        /**
-         * @param msg Message.
-         */
-        private void slowDownMessage(Message msg) {
-            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() 
instanceof GridDhtPartitionSupplyMessage) {
-                int grpId = 
((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+                if (latch != null) {
+                    if (latch.getCount() > 0)
+                        latch.countDown();
+                    else {
+                        resumeRebalanceFutures.get(msg0.groupId()).listen(f -> 
super.notifyListener(sndId, msg, msgC));
 
-                if (grpId == CU.cacheId(GROUP_1) || grpId == 
CU.cacheId(GROUP_2)) {
-                    try {
-                        U.sleep(50);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        e.printStackTrace();
+                        return;
                     }
                 }
             }
+
+            super.notifyListener(sndId, msg, msgC);
         }
     }
 }

Reply via email to