Repository: ignite
Updated Branches:
  refs/heads/master d36055ec1 -> 493e0aa9e


IGNITE-10374 Fixed race leading to rebalancing hang when WAL is disabled during 
rebalancing - Fixes #5468.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/493e0aa9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/493e0aa9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/493e0aa9

Branch: refs/heads/master
Commit: 493e0aa9e8451d36ff504560cc5309e8cd04a616
Parents: d36055e
Author: Sergey Chugunov <sergey.chugu...@gmail.com>
Authored: Sat Dec 1 19:33:14 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Sat Dec 1 19:34:39 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/WalStateManager.java       |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        |   3 +-
 .../dht/preloader/GridDhtPreloader.java         |  35 +--
 .../topology/GridClientPartitionTopology.java   |   2 +-
 .../dht/topology/GridDhtPartitionTopology.java  |   4 +-
 .../topology/GridDhtPartitionTopologyImpl.java  |  17 +-
 .../cache/persistence/CheckpointFuture.java     |   2 +-
 .../GridCacheDatabaseSharedManager.java         |   2 +-
 ...itePdsCacheWalDisabledOnRebalancingTest.java | 266 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite4.java  |   3 +
 10 files changed, 293 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index d17f20f..ed62cad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -473,7 +473,7 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
     public void onGroupRebalanceFinished(int grpId, AffinityTopologyVersion 
topVer) {
         TemporaryDisabledWal session0 = tmpDisabledWal;
 
-        if (session0 == null || !session0.topVer.equals(topVer))
+        if (session0 == null || session0.topVer.compareTo(topVer) > 0)
             return;
 
         session0.remainingGrps.remove(grpId);
@@ -506,7 +506,7 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
 
                         assert grp != null;
 
-                        grp.topology().ownMoving(session0.topVer);
+                        grp.topology().ownMoving(topVer);
                     }
 
                     cctx.exchange().refreshPartitions();

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3384bb9..1eba8b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2101,7 +2101,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     grp.topology().onExchangeDone(this, 
grp.affinity().readyAffinity(res), false);
             }
 
-            cctx.walState().changeLocalStatesOnExchangeDone(res);
+            if (changedAffinity())
+                cctx.walState().changeLocalStatesOnExchangeDone(res);
         }
 
         final Throwable err0 = err;

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index eed0816..c8705d0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -186,38 +185,10 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         if (rebFut.isDone() && !rebFut.result())
             return true; // Required, previous rebalance cancelled.
 
-        final AffinityTopologyVersion exchTopVer = 
exchFut.context().events().topologyVersion();
+        AffinityTopologyVersion lastAffChangeTopVer =
+            
ctx.exchange().lastAffinityChangedTopologyVersion(exchFut.topologyVersion());
 
-        Collection<UUID> aliveNodes = 
ctx.discovery().aliveServerNodes().stream()
-            .map(ClusterNode::id)
-            .collect(Collectors.toList());
-
-        return assignmentsChanged(rebTopVer, exchTopVer) ||
-            !aliveNodes.containsAll(demander.remainingNodes()); // Some of 
nodes left before rabalance compelete.
-    }
-
-    /**
-     * @param oldTopVer Previous topology version.
-     * @param newTopVer New topology version to check result.
-     * @return {@code True} if affinity assignments changed between two 
versions for local node.
-     */
-    private boolean assignmentsChanged(AffinityTopologyVersion oldTopVer, 
AffinityTopologyVersion newTopVer) {
-        final AffinityAssignment aff = grp.affinity().readyAffinity(newTopVer);
-
-        // We should get affinity assignments based on previous rebalance to 
calculate difference.
-        // Whole history size described by IGNITE_AFFINITY_HISTORY_SIZE 
constant.
-        final AffinityAssignment prevAff = 
grp.affinity().cachedVersions().contains(oldTopVer) ?
-            grp.affinity().cachedAffinity(oldTopVer) : null;
-
-        if (prevAff == null)
-            return false;
-
-        boolean assignsChanged = false;
-
-        for (int p = 0; !assignsChanged && p < grp.affinity().partitions(); 
p++)
-            assignsChanged |= aff.get(p).contains(ctx.localNode()) != 
prevAff.get(p).contains(ctx.localNode());
-
-        return assignsChanged;
+        return lastAffChangeTopVer.compareTo(rebTopVer) > 0;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index fc8975b..9fb6825 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1110,7 +1110,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void ownMoving(AffinityTopologyVersion topVer) {
+    @Override public void ownMoving(AffinityTopologyVersion rebFinishedTopVer) 
{
         // No-op
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 81eab84..4b1e5a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -373,9 +373,9 @@ public interface GridDhtPartitionTopology {
     /**
      * Owns all moving partitions for the given topology version.
      *
-     * @param topVer Topology version.
+     * @param rebFinishedTopVer Topology version when rebalancing finished.
      */
-    public void ownMoving(AffinityTopologyVersion topVer);
+    public void ownMoving(AffinityTopologyVersion rebFinishedTopVer);
 
     /**
      * @param part Evicted partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 887f3d0..c90d40f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -2574,19 +2574,26 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void ownMoving(AffinityTopologyVersion topVer) {
+    @Override public void ownMoving(AffinityTopologyVersion rebFinishedTopVer) 
{
         lock.writeLock().lock();
 
+        AffinityTopologyVersion lastAffChangeVer = 
ctx.exchange().lastAffinityChangedTopologyVersion(lastTopChangeVer);
+
+        if (lastAffChangeVer.compareTo(rebFinishedTopVer) > 0)
+            log.info("Affinity topology changed, no MOVING partitions will be 
owned " +
+                "[rebFinishedTopVer=" + rebFinishedTopVer +
+                ", lastAffChangeVer=" + lastAffChangeVer + "]");
+
         try {
             for (GridDhtLocalPartition locPart : 
grp.topology().currentLocalPartitions()) {
                 if (locPart.state() == MOVING) {
                     boolean reserved = locPart.reserve();
 
                     try {
-                        if (reserved && locPart.state() == MOVING && 
lastTopChangeVer.equals(topVer))
-                            grp.topology().own(locPart);
-                        else // topology changed, rebalancing must be restarted
-                            return;
+                        if (reserved && locPart.state() == MOVING &&
+                            lastAffChangeVer.compareTo(rebFinishedTopVer) <= 0 
&&
+                            rebFinishedTopVer.compareTo(lastTopChangeVer) <= 0)
+                                grp.topology().own(locPart);
                     }
                     finally {
                         if (reserved)

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
index 1c77013..7671738 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
@@ -31,5 +31,5 @@ public interface CheckpointFuture  {
     /**
      * @return Finish future.
      */
-    public GridFutureAdapter finishFuture();
+    public GridFutureAdapter<Object> finishFuture();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index eb52ee6..d610a51 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -4277,7 +4277,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         }
 
         /** {@inheritDoc} */
-        @Override public GridFutureAdapter finishFuture() {
+        @Override public GridFutureAdapter<Object> finishFuture() {
             return cpFinishFut;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
new file mode 100644
index 0000000..1246db1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
+/**
+ * Test scenarios with rebalancing, IGNITE_DISABLE_WAL_DURING_REBALANCING 
optimization and topology changes
+ * such as client nodes join/leave, server nodes from BLT leave/join, server 
nodes out of BLT join/leave.
+ */
+public class IgnitePdsCacheWalDisabledOnRebalancingTest extends 
GridCommonAbstractTest {
+    /** Block message predicate to set to Communication SPI in node 
configuration. */
+    private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate;
+
+    /** */
+    private static final int CACHE1_PARTS_NUM = 8;
+
+    /** */
+    private static final int CACHE2_PARTS_NUM = 16;
+
+    /** */
+    private static final int CACHE3_PARTS_NUM = 32;
+
+    /** */
+    private static final int CACHE_SIZE = 2_000;
+
+    /** */
+    private static final String CACHE3_NAME = "cache3";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING,
 "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        
System.clearProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration("cache1")
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(CacheMode.REPLICATED)
+            .setAffinity(new RendezvousAffinityFunction(false, 
CACHE1_PARTS_NUM));
+
+        CacheConfiguration ccfg2 = new CacheConfiguration("cache2")
+            .setBackups(1)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAffinity(new RendezvousAffinityFunction(false, 
CACHE2_PARTS_NUM));
+
+        CacheConfiguration ccfg3 = new CacheConfiguration(CACHE3_NAME)
+            .setBackups(2)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAffinity(new RendezvousAffinityFunction(false, 
CACHE3_PARTS_NUM));
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3);
+
+        if ("client".equals(igniteInstanceName))
+            cfg.setClientMode(true);
+        else {
+            DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+                
.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
+                .setWalMode(WALMode.LOG_ONLY)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(256 * 1024 * 1024));
+
+            cfg.setDataStorageConfiguration(dsCfg);
+        }
+
+        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();
+        commSpi.blockMessages(blockMessagePredicate);
+
+        cfg.setCommunicationSpi(commSpi);
+
+
+        return cfg;
+    }
+
+    /**
+     * If client joins topology during rebalancing process, rebalancing 
finishes successfully,
+     * all partitions are owned as expected when rebalancing finishes.
+     */
+    public void testClientJoinsLeavesDuringRebalancing() throws Exception {
+        Ignite ig0 = startGrids(2);
+
+        ig0.active(true);
+
+        for (int i = 0; i < 3; i++)
+            fillCache(ig0.getOrCreateCache("cache" + i), CACHE_SIZE);
+
+        String ig1Name = "node01-" + grid(1).localNode().consistentId();
+
+        stopGrid(1);
+
+        cleanPersistenceFiles(ig1Name);
+
+        int groupId = ((IgniteEx) ig0).cachex(CACHE3_NAME).context().groupId();
+
+        blockMessagePredicate = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage)
+                return ((GridDhtPartitionDemandMessage) msg).groupId() == 
groupId;
+
+            return false;
+        };
+
+        IgniteEx ig1 = startGrid(1);
+
+        startGrid("client");
+
+        stopGrid("client");
+
+        CacheGroupMetricsMXBean mxBean = 
ig1.cachex(CACHE3_NAME).context().group().mxBean();
+
+        assertTrue("Unexpected moving partitions count: " + 
mxBean.getLocalNodeMovingPartitionsCount(),
+            mxBean.getLocalNodeMovingPartitionsCount() == CACHE3_PARTS_NUM);
+
+        TestRecordingCommunicationSpi commSpi = 
(TestRecordingCommunicationSpi) ig1
+            .configuration().getCommunicationSpi();
+
+        commSpi.stopBlock();
+
+        boolean waitResult = GridTestUtils.waitForCondition(
+            () -> mxBean.getLocalNodeMovingPartitionsCount() == 0,
+            30_000);
+
+        assertTrue("Failed to wait for owning all partitions, parts in moving 
state: "
+            + mxBean.getLocalNodeMovingPartitionsCount(), waitResult);
+    }
+
+    /**
+     * If server nodes from BLT leave topology and then join again after 
additional keys were put to caches,
+     * rebalance starts.
+     * 
+     * Test verifies that all moving partitions get owned after rebalance 
finishes.
+     *
+     * @throws Exception If failed.
+     */
+    public void testServerNodesFromBltLeavesAndJoinsDuringRebalancing() throws 
Exception {
+        Ignite ig0 = startGridsMultiThreaded(4);
+
+        fillCache(ig0.cache(CACHE3_NAME), CACHE_SIZE);
+
+        List<Integer> nonAffinityKeys1 = nearKeys(grid(1).cache(CACHE3_NAME), 
100, CACHE_SIZE / 2);
+        List<Integer> nonAffinityKeys2 = nearKeys(grid(2).cache(CACHE3_NAME), 
100, CACHE_SIZE / 2);
+
+        stopGrid(1);
+        stopGrid(2);
+
+        Set<Integer> nonAffinityKeysSet = new HashSet<>();
+
+        nonAffinityKeysSet.addAll(nonAffinityKeys1);
+        nonAffinityKeysSet.addAll(nonAffinityKeys2);
+
+        fillCache(ig0.cache(CACHE3_NAME), nonAffinityKeysSet);
+
+        int groupId = ((IgniteEx) ig0).cachex(CACHE3_NAME).context().groupId();
+
+        blockMessagePredicate = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage)
+                return ((GridDhtPartitionDemandMessage) msg).groupId() == 
groupId;
+
+            return false;
+        };
+
+        IgniteEx ig1 = startGrid(1);
+
+        CacheGroupMetricsMXBean mxBean = 
ig1.cachex(CACHE3_NAME).context().group().mxBean();
+
+        TestRecordingCommunicationSpi commSpi = 
(TestRecordingCommunicationSpi) ig1
+            .configuration().getCommunicationSpi();
+
+        startGrid(2);
+
+        commSpi.stopBlock();
+
+        boolean allOwned = GridTestUtils.waitForCondition(
+            () -> mxBean.getLocalNodeMovingPartitionsCount() == 0, 30_000);
+
+        assertTrue("Partitions were not owned, there are " + 
mxBean.getLocalNodeMovingPartitionsCount() +
+            " partitions in MOVING state", allOwned);
+    }
+
+    /** */
+    private void cleanPersistenceFiles(String igName) throws Exception {
+        String ig1DbPath = Paths.get(DFLT_STORE_DIR, igName).toString();
+
+        File igDbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), 
ig1DbPath, false);
+
+        U.delete(igDbDir);
+        Files.createDirectory(igDbDir.toPath());
+
+        String ig1DbWalPath = Paths.get(DFLT_STORE_DIR, "wal", 
igName).toString();
+
+        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
ig1DbWalPath, false));
+    }
+
+    /** */
+    private void fillCache(IgniteCache cache, int cacheSize) {
+        for (int i = 0; i < cacheSize; i++)
+            cache.put(i, "value_" + i);
+    }
+
+    /** */
+    private void fillCache(IgniteCache cache, Collection<Integer> keys) {
+        for (Integer key : keys)
+            cache.put(key, "value_" + key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/493e0aa9/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 027f341..ad6bbbf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -23,6 +23,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactiva
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheWalDisabledOnRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
@@ -50,6 +51,8 @@ public class IgnitePdsTestSuite4 extends TestSuite {
 
         suite.addTestSuite(ResetLostPartitionTest.class);
 
+        suite.addTestSuite(IgnitePdsCacheWalDisabledOnRebalancingTest.class);
+
         return suite;
     }
 

Reply via email to