Repository: ignite
Updated Branches:
  refs/heads/master ea8bfac60 -> 8826fa13e


IGNITE-8017 Disable WAL during initial rebalancing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8826fa13
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8826fa13
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8826fa13

Branch: refs/heads/master
Commit: 8826fa13edd0ab776b9fa2ab482b5a216b99b103
Parents: ea8bfac
Author: Ilya Lantukh <[email protected]>
Authored: Wed Apr 18 17:52:20 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Wed Apr 18 17:52:20 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   7 +
 .../processors/cache/CacheGroupContext.java     |  54 +-
 .../processors/cache/WalStateManager.java       | 160 +++++-
 .../dht/GridClientPartitionTopology.java        |   5 +
 .../dht/GridDhtPartitionTopology.java           |   7 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  27 +
 .../dht/preloader/GridDhtPartitionDemander.java |  24 +-
 .../GridDhtPartitionsExchangeFuture.java        |   2 +
 .../GridCacheDatabaseSharedManager.java         |  63 +-
 .../IgniteCacheDatabaseSharedManager.java       |   4 +-
 .../persistence/file/FilePageStoreManager.java  |  18 +-
 ...lWalModeChangeDuringRebalancingSelfTest.java | 571 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   3 +
 13 files changed, 895 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 437f49f..32fed05 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -878,6 +878,13 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD = 
"IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD";
 
     /**
+     * When set to {@code true}, WAL will be automatically disabled during 
rebalancing if there is no partition in
+     * OWNING state.
+     * Default is {@code false}.
+     */
+    public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = 
"IGNITE_DISABLE_WAL_DURING_REBALANCING";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 12636f3..849ecc8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -152,7 +152,10 @@ public class CacheGroupContext {
     private CacheGroupMetricsMXBean mxBean;
 
     /** */
-    private volatile boolean walEnabled;
+    private volatile boolean localWalEnabled;
+
+    /** */
+    private volatile boolean globalWalEnabled;
 
     /**
      * @param grpId Group ID.
@@ -196,9 +199,10 @@ public class CacheGroupContext {
         this.reuseList = reuseList;
         this.locStartVer = locStartVer;
         this.cacheType = cacheType;
-        this.walEnabled = walEnabled;
+        this.globalWalEnabled = walEnabled;
+        this.localWalEnabled = true;
 
-        persistWalState(walEnabled);
+        persistGlobalWalState(walEnabled);
 
         ioPlc = cacheType.ioPolicy();
 
@@ -1021,22 +1025,52 @@ public class CacheGroupContext {
      * WAL enabled flag.
      */
     public boolean walEnabled() {
-        return walEnabled;
+        return localWalEnabled && globalWalEnabled;
+    }
+
+    /**
+     * Local WAL enabled flag.
+     */
+    public boolean localWalEnabled() {
+        return localWalEnabled;
+    }
+
+    /**
+     * @Global WAL enabled flag.
+     */
+    public boolean globalWalEnabled() {
+        return globalWalEnabled;
     }
 
     /**
-     * @param enabled WAL enabled flag.
+     * @param enabled Global WAL enabled flag.
      */
-    public void walEnabled(boolean enabled) {
-        persistWalState(enabled);
+    public void globalWalEnabled(boolean enabled) {
+        persistGlobalWalState(enabled);
 
-        this.walEnabled = enabled;
+        this.globalWalEnabled = enabled;
+    }
+
+    /**
+     * @param enabled Local WAL enabled flag.
+     */
+    public void localWalEnabled(boolean enabled) {
+        persistLocalWalState(enabled);
+
+        this.localWalEnabled = enabled;
+    }
+
+    /**
+     * @param enabled Enabled flag..
+     */
+    private void persistGlobalWalState(boolean enabled) {
+        shared().database().walEnabled(grpId, enabled, false);
     }
 
     /**
      * @param enabled Enabled flag..
      */
-    private void persistWalState(boolean enabled) {
-        shared().database().walEnabled(grpId, enabled);
+    private void persistLocalWalState(boolean enabled) {
+        shared().database().walEnabled(grpId, enabled, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 64a6819..4a14730 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -26,10 +28,13 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -44,15 +49,18 @@ import org.jetbrains.annotations.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.ignite.internal.GridTopic.TOPIC_WAL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  * Write-ahead log state manager. Manages WAL enable and disable.
@@ -102,6 +110,9 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
     /** Disconnected flag. */
     private boolean disconnected;
 
+    /** Holder for groups with temporary disabled WAL. */
+    private volatile TemporaryDisabledWal tmpDisabledWal;
+
     /**
      * Constructor.
      *
@@ -328,6 +339,126 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Change local WAL state before exchange is done. This method will 
disable WAL for groups without partitions
+     * in OWNING state if such feature is enabled.
+     *
+     * @param topVer Topology version.
+     */
+    public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion 
topVer) {
+        if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING,
 false))
+            return;
+
+        Set<Integer> grpsToEnableWal = new HashSet<>();
+        Set<Integer> grpsToDisableWal = new HashSet<>();
+        Set<Integer> grpsWithWalDisabled = new HashSet<>();
+
+        boolean hasNonEmptyOwning = false;
+
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal() || !grp.affinityNode() || 
!grp.persistenceEnabled())
+                continue;
+
+            boolean hasOwning = false;
+
+            for (GridDhtLocalPartition locPart : 
grp.topology().currentLocalPartitions()) {
+                if (locPart.state() == OWNING) {
+                    hasOwning = true;
+
+                    if (hasNonEmptyOwning)
+                        break;
+
+                    if (locPart.updateCounter() > 0) {
+                        hasNonEmptyOwning = true;
+
+                        break;
+                    }
+                }
+            }
+
+            if (hasOwning && !grp.localWalEnabled()) {
+                grpsToEnableWal.add(grp.groupId());
+            }
+            else if (!hasOwning && grp.localWalEnabled()) {
+                grpsToDisableWal.add(grp.groupId());
+
+                grpsWithWalDisabled.add(grp.groupId());
+            }
+            else if (!grp.localWalEnabled())
+                grpsWithWalDisabled.add(grp.groupId());
+        }
+
+        tmpDisabledWal = new TemporaryDisabledWal(grpsWithWalDisabled, topVer);
+
+        if (grpsToEnableWal.isEmpty() && grpsToDisableWal.isEmpty())
+            return;
+
+        try {
+            if (hasNonEmptyOwning && !grpsToEnableWal.isEmpty())
+                triggerCheckpoint(0).finishFuture().get();
+        }
+        catch (IgniteCheckedException ex) {
+            throw new IgniteException(ex);
+        }
+
+        for (Integer grpId : grpsToEnableWal)
+            cctx.cache().cacheGroup(grpId).localWalEnabled(true);
+
+        for (Integer grpId : grpsToDisableWal)
+            cctx.cache().cacheGroup(grpId).localWalEnabled(false);
+    }
+
+    /**
+     * Callback when group rebalancing is finished. If there are no pending 
groups, it should trigger checkpoint and
+     * change partition states.
+     * @param grpId Group ID.
+     * @param topVer Topology version.
+     */
+    public void onGroupRebalanceFinished(int grpId, AffinityTopologyVersion 
topVer) {
+        TemporaryDisabledWal session0 = tmpDisabledWal;
+
+        if (session0 == null || !session0.topVer.equals(topVer))
+            return;
+
+        session0.remainingGrps.remove(grpId);
+
+        if (session0.remainingGrps.isEmpty()) {
+            synchronized (mux) {
+                if (tmpDisabledWal != session0)
+                    return;
+
+                for (Integer grpId0 : session0.disabledGrps) {
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
+
+                    assert grp != null;
+
+                    if (!grp.localWalEnabled())
+                        grp.localWalEnabled(true);
+                }
+
+                tmpDisabledWal = null;
+            }
+
+            CheckpointFuture cpFut = triggerCheckpoint(0);
+
+            assert cpFut != null;
+
+            cpFut.finishFuture().listen(new 
IgniteInClosureX<IgniteInternalFuture>() {
+                @Override public void applyx(IgniteInternalFuture future) {
+                    for (Integer grpId0 : session0.disabledGrps) {
+                        CacheGroupContext grp = 
cctx.cache().cacheGroup(grpId0);
+
+                        assert grp != null;
+
+                        grp.topology().ownMoving(session0.topVer);
+                    }
+
+                    cctx.exchange().refreshPartitions();
+                }
+            });
+        }
+    }
+
+    /**
      * Handle propose message in discovery thread.
      *
      * @param msg Message.
@@ -455,7 +586,7 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
                         "no longer exist: " + msg.caches().keySet());
                 }
                 else {
-                    if (F.eq(msg.enable(), grpCtx.walEnabled()))
+                    if (F.eq(msg.enable(), grpCtx.globalWalEnabled()))
                         // Nothing changed -> no-op.
                         res = new WalStateResult(msg, false);
                     else {
@@ -468,7 +599,7 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
                                 cpFut.beginFuture().get();
 
                                 if (msg.enable()) {
-                                    grpCtx.walEnabled(true);
+                                    grpCtx.globalWalEnabled(true);
 
                                     // Enable: it is enough to release cache 
operations once mark is finished because
                                     // not-yet-flushed dirty pages have been 
logged.
@@ -489,7 +620,7 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
 
                                     // WAL state is persisted after checkpoint 
if finished. Otherwise in case of crash
                                     // and restart we will think that WAL is 
enabled, but data might be corrupted.
-                                    grpCtx.walEnabled(false);
+                                    grpCtx.globalWalEnabled(false);
                                 }
                             }
                             catch (Exception e) {
@@ -917,4 +1048,27 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
             onCompletedLocally(res);
         }
     }
+
+    /**
+     *
+     */
+    private static class TemporaryDisabledWal {
+        /** Groups with disabled WAL. */
+        private final Set<Integer> disabledGrps;
+
+        /** Remaining groups. */
+        private final Set<Integer> remainingGrps;
+
+        /** Topology version*/
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        public TemporaryDisabledWal(
+            Set<Integer> disabledGrps,
+            AffinityTopologyVersion topVer) {
+            this.disabledGrps = Collections.unmodifiableSet(disabledGrps);
+            this.remainingGrps = new HashSet<>(disabledGrps);
+            this.topVer = topVer;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 3e3bb0d..dcb8b96 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -1104,6 +1104,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void ownMoving(AffinityTopologyVersion topVer) {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
     @Override public void onEvicted(GridDhtLocalPartition part, boolean 
updateSeq) {
         assert updateSeq || lock.isWriteLockedByCurrentThread();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d586a94d..2df2e89 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -357,6 +357,13 @@ public interface GridDhtPartitionTopology {
     public boolean own(GridDhtLocalPartition part);
 
     /**
+     * Owns all moving partitions for the given topology version.
+     *
+     * @param topVer Topology version.
+     */
+    public void ownMoving(AffinityTopologyVersion topVer);
+
+    /**
      * @param part Evicted partition.
      * @param updateSeq Update sequence increment flag.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 164f0bf..68104a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -2410,6 +2410,33 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void ownMoving(AffinityTopologyVersion topVer) {
+        lock.writeLock().lock();
+
+        try {
+            for (GridDhtLocalPartition locPart : 
grp.topology().currentLocalPartitions()) {
+                if (locPart.state() == MOVING) {
+                    boolean reserved = locPart.reserve();
+
+                    try {
+                        if (reserved && locPart.state() == MOVING && 
lastTopChangeVer.equals(topVer))
+                            grp.topology().own(locPart);
+                        else // topology changed, rebalancing must be restarted
+                            return;
+                    }
+                    finally {
+                        if (reserved)
+                            locPart.release();
+                    }
+                }
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void onEvicted(GridDhtLocalPartition part, boolean 
updateSeq) {
         ctx.database().checkpointReadLock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 337553b..dc4bfe9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -57,6 +57,7 @@ import 
org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -282,6 +283,14 @@ public class GridDhtPartitionDemander {
 
             final RebalanceFuture fut = new RebalanceFuture(grp, assignments, 
log, rebalanceId);
 
+            if (!grp.localWalEnabled())
+                fut.listen(new 
IgniteInClosureX<IgniteInternalFuture<Boolean>>() {
+                    @Override public void applyx(IgniteInternalFuture<Boolean> 
future) throws IgniteCheckedException {
+                        if (future.get())
+                            
ctx.walState().onGroupRebalanceFinished(grp.groupId(), 
assignments.topologyVersion());
+                    }
+                });
+
             if (!oldFut.isInitial())
                 oldFut.cancel();
             else
@@ -722,9 +731,7 @@ public class GridDhtPartitionDemander {
                                 // If message was last for this partition,
                                 // then we take ownership.
                                 if (last) {
-                                    top.own(part);
-
-                                    fut.partitionDone(nodeId, p);
+                                    fut.partitionDone(nodeId, p, true);
 
                                     if (log.isDebugEnabled())
                                         log.debug("Finished rebalancing 
partition: " + part);
@@ -737,14 +744,14 @@ public class GridDhtPartitionDemander {
                         }
                         else {
                             if (last)
-                                fut.partitionDone(nodeId, p);
+                                fut.partitionDone(nodeId, p, false);
 
                             if (log.isDebugEnabled())
                                 log.debug("Skipping rebalancing partition 
(state is not MOVING): " + part);
                         }
                     }
                     else {
-                        fut.partitionDone(nodeId, p);
+                        fut.partitionDone(nodeId, p, false);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (it does 
not belong on current node): " + p);
@@ -762,7 +769,7 @@ public class GridDhtPartitionDemander {
             }
 
             for (Integer miss : supply.missed())
-                fut.partitionDone(nodeId, miss);
+                fut.partitionDone(nodeId, miss, false);
 
             GridDhtPartitionDemandMessage d = new 
GridDhtPartitionDemandMessage(
                 supply.rebalanceId(),
@@ -1064,8 +1071,11 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param p Partition number.
          */
-        private void partitionDone(UUID nodeId, int p) {
+        private void partitionDone(UUID nodeId, int p, boolean updateState) {
             synchronized (this) {
+                if (updateState && grp.localWalEnabled())
+                    grp.topology().own(grp.topology().localPartition(p));
+
                 if (isDone())
                     return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0d57d48..a21d98e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1702,6 +1702,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (!grp.isLocal())
                     grp.topology().onExchangeDone(this, 
grp.affinity().readyAffinity(res), false);
             }
+
+            
cctx.walState().changeLocalStatesOnExchangeDone(exchId.topologyVersion());
         }
 
         if (super.onDone(res, err)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 8f81708..d314d50 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.io.Serializable;
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -285,7 +286,13 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     private static final String MBEAN_GROUP = "Persistent Store";
 
     /** WAL marker prefix for meta store. */
-    private static final String WAL_KEY_PREFIX = "grp-wal-disabled-";
+    private static final String WAL_KEY_PREFIX = "grp-wal-";
+
+    /** WAL marker prefix for meta store. */
+    private static final String WAL_GLOBAL_KEY_PREFIX = WAL_KEY_PREFIX + 
"disabled-";
+
+    /** WAL marker prefix for meta store. */
+    private static final String WAL_LOCAL_KEY_PREFIX = WAL_KEY_PREFIX + 
"local-disabled-";
 
     /** WAL marker predicate for meta store. */
     private static final IgnitePredicate<String> WAL_KEY_PREFIX_PRED = new 
IgnitePredicate<String>() {
@@ -385,7 +392,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     private List<MetastorageLifecycleListener> metastorageLifecycleLsnrs;
 
     /** Initially disabled cache groups. */
-    private Collection<Integer> initiallyWalDisabledGrps;
+    private Collection<Integer> initiallyGlobalWalDisabledGrps = new 
HashSet<>();
+
+    private Collection<Integer> initiallyLocalWalDisabledGrps = new 
HashSet<>();
 
     /** File I/O factory for writing checkpoint markers. */
     private final FileIOFactory ioFactory;
@@ -596,7 +605,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                 applyLastUpdates(status, true);
 
-                initiallyWalDisabledGrps = walDisabledGroups();
+                fillWalDisabledGroups();
 
                 notifyMetastorageReadyForRead();
             }
@@ -1974,7 +1983,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         int applied = 0;
         WALPointer lastRead = null;
 
-        Collection<Integer> ignoreGrps = storeOnly ? Collections.emptySet() : 
initiallyWalDisabledGrps;
+        Collection<Integer> ignoreGrps = storeOnly ? Collections.emptySet() :
+            F.concat(false, initiallyGlobalWalDisabledGrps, 
initiallyLocalWalDisabledGrps);
 
         try (WALIterator it = cctx.wal().replay(status.endPtr)) {
             while (it.hasNextX()) {
@@ -2228,7 +2238,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         long start = U.currentTimeMillis();
         int applied = 0;
 
-        Collection<Integer> ignoreGrps = metastoreOnly ? 
Collections.emptySet() : initiallyWalDisabledGrps;
+        Collection<Integer> ignoreGrps = metastoreOnly ? 
Collections.emptySet() :
+            F.concat(false, initiallyGlobalWalDisabledGrps, 
initiallyLocalWalDisabledGrps);
 
         try (WALIterator it = cctx.wal().replay(status.startPtr)) {
             Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new 
HashMap<>();
@@ -4401,13 +4412,16 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /** {@inheritDoc} */
-    @Override public boolean walEnabled(int grpId) {
-        return !initiallyWalDisabledGrps.contains(grpId);
+    @Override public boolean walEnabled(int grpId, boolean local) {
+        if (local)
+            return !initiallyLocalWalDisabledGrps.contains(grpId);
+        else
+            return !initiallyGlobalWalDisabledGrps.contains(grpId);
     }
 
     /** {@inheritDoc} */
-    @Override public void walEnabled(int grpId, boolean enabled) {
-        String key = walGroupIdToKey(grpId);
+    @Override public void walEnabled(int grpId, boolean enabled, boolean 
local) {
+        String key = walGroupIdToKey(grpId, local);
 
         checkpointReadLock();
 
@@ -4427,27 +4441,26 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
-     * @return List of initially WAL-disabled groups.
+     *
      */
-    private Collection<Integer> walDisabledGroups() {
+    private void fillWalDisabledGroups() {
         MetaStorage meta = cctx.database().metaStorage();
 
         try {
             Set<String> keys = 
meta.readForPredicate(WAL_KEY_PREFIX_PRED).keySet();
 
             if (keys.isEmpty())
-                return Collections.emptySet();
-
-            HashSet<Integer> res = new HashSet<>(keys.size());
+                return;
 
             for (String key : keys) {
-                int grpId = walKeyToGroupId(key);
+                T2<Integer, Boolean> t2 = walKeyToGroupIdAndLocalFlag(key);
 
-                res.add(grpId);
+                if (t2.get2())
+                    initiallyLocalWalDisabledGrps.add(t2.get1());
+                else
+                    initiallyGlobalWalDisabledGrps.add(t2.get1());
             }
 
-            return res;
-
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to read cache groups WAL 
state.", e);
@@ -4460,8 +4473,11 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      * @param grpId Group ID.
      * @return Key.
      */
-    private static String walGroupIdToKey(int grpId) {
-        return WAL_KEY_PREFIX + grpId;
+    private static String walGroupIdToKey(int grpId, boolean local) {
+        if (local)
+            return WAL_LOCAL_KEY_PREFIX + grpId;
+        else
+            return WAL_GLOBAL_KEY_PREFIX + grpId;
     }
 
     /**
@@ -4470,7 +4486,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      * @param key Key.
      * @return Group ID.
      */
-    private static int walKeyToGroupId(String key) {
-        return Integer.parseInt(key.substring(WAL_KEY_PREFIX.length()));
+    private static T2<Integer, Boolean> walKeyToGroupIdAndLocalFlag(String 
key) {
+        if (key.startsWith(WAL_LOCAL_KEY_PREFIX))
+            return new 
T2<>(Integer.parseInt(key.substring(WAL_LOCAL_KEY_PREFIX.length())), true);
+        else
+            return new 
T2<>(Integer.parseInt(key.substring(WAL_GLOBAL_KEY_PREFIX.length())), false);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 8746dca..bf080b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1035,7 +1035,7 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
      * @param grpId Group ID.
      * @return WAL enabled flag.
      */
-    public boolean walEnabled(int grpId) {
+    public boolean walEnabled(int grpId, boolean local) {
         return false;
     }
 
@@ -1045,7 +1045,7 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
      * @param grpId Group id.
      * @param enabled flag.
      */
-    public void walEnabled(int grpId, boolean enabled) {
+    public void walEnabled(int grpId, boolean enabled, boolean local) {
         // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 837f3d0..1c1b3e2 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -845,16 +845,22 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
 
     /** {@inheritDoc} */
     @Override public void beforeCacheGroupStart(CacheGroupDescriptor grpDesc) {
-        if (grpDesc.persistenceEnabled() && 
!cctx.database().walEnabled(grpDesc.groupId())) {
-            File dir = cacheWorkDir(grpDesc.config());
+        if (grpDesc.persistenceEnabled()) {
+            boolean localEnabled = 
cctx.database().walEnabled(grpDesc.groupId(), true);
+            boolean globalEnabled = 
cctx.database().walEnabled(grpDesc.groupId(), false);
 
-            assert dir.exists();
+            if (!localEnabled || !globalEnabled) {
+                File dir = cacheWorkDir(grpDesc.config());
 
-            boolean res = IgniteUtils.delete(dir);
+                assert dir.exists();
 
-            assert res;
+                boolean res = IgniteUtils.delete(dir);
 
-            grpDesc.walEnabled(false);
+                assert res;
+
+                if (!globalEnabled)
+                    grpDesc.walEnabled(false);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
new file mode 100644
index 0000000..07653f2
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class LocalWalModeChangeDuringRebalancingSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static boolean disableWalDuringRebalancing = true;
+
+    /** */
+    private static final AtomicReference<CountDownLatch> supplyMessageLatch = 
new AtomicReference<>();
+
+    /** */
+    private static final AtomicReference<CountDownLatch> fileIOLatch = new 
AtomicReference<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(200 * 1024 * 1024)
+                        .setInitialSize(200 * 1024 * 1024)
+                )
+                // Test verifies checkpoint count, so it is essencial that no 
checkpoint is triggered by timeout
+                .setCheckpointFrequency(999_999_999_999L)
+                .setFileIOFactory(new TestFileIOFactory(new 
DataStorageConfiguration().getFileIOFactory()))
+        );
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration(DEFAULT_CACHE_NAME)
+                // Test checks internal state before and after rebalance, so 
it is configured to be triggered manually
+                .setRebalanceDelay(-1)
+        );
+
+        cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+            @Override public void sendMessage(ClusterNode node, Message msg) 
throws IgniteSpiException {
+                if (msg instanceof GridIoMessage && 
((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+                    int grpId = 
((GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message()).groupId();
+
+                    if (grpId == CU.cacheId(DEFAULT_CACHE_NAME)) {
+                        CountDownLatch latch0 = supplyMessageLatch.get();
+
+                        if (latch0 != null)
+                            try {
+                                latch0.await();
+                            }
+                            catch (InterruptedException ex) {
+                                throw new IgniteException(ex);
+                            }
+                    }
+                }
+
+                super.sendMessage(node, msg);
+            }
+
+            @Override public void sendMessage(ClusterNode node, Message msg,
+                IgniteInClosure<IgniteException> ackC) throws 
IgniteSpiException {
+                if (msg instanceof GridIoMessage && 
((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+                    int grpId = 
((GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message()).groupId();
+
+                    if (grpId == CU.cacheId(DEFAULT_CACHE_NAME)) {
+                        CountDownLatch latch0 = supplyMessageLatch.get();
+
+                        if (latch0 != null)
+                            try {
+                                latch0.await();
+                            }
+                            catch (InterruptedException ex) {
+                                throw new IgniteException(ex);
+                            }
+                    }
+                }
+
+                super.sendMessage(node, msg, ackC);
+            }
+        });
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING,
+            Boolean.toString(disableWalDuringRebalancing));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        CountDownLatch msgLatch = supplyMessageLatch.get();
+
+        if (msgLatch != null) {
+            while (msgLatch.getCount() > 0)
+                msgLatch.countDown();
+
+            supplyMessageLatch.set(null);
+        }
+
+        CountDownLatch fileLatch = fileIOLatch.get();
+
+        if (fileLatch != null) {
+            while (fileLatch.getCount() > 0)
+                fileLatch.countDown();
+
+            fileIOLatch.set(null);
+        }
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        disableWalDuringRebalancing = true;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWalDisabledDuringRebalancing() throws Exception {
+        doTestSimple();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWalNotDisabledIfParameterSetToFalse() throws Exception {
+        disableWalDuringRebalancing = false;
+
+        doTestSimple();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestSimple() throws Exception {
+        Ignite ignite = startGrids(3);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for (int k = 0; k < 10_000; k++)
+            cache.put(k, k);
+
+        IgniteEx newIgnite = startGrid(3);
+
+        final GridCacheDatabaseSharedManager.CheckpointHistory cpHistory =
+            
((GridCacheDatabaseSharedManager)newIgnite.context().cache().context().database()).checkpointHistory();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return !cpHistory.checkpoints().isEmpty();
+            }
+        }, 10_000);
+
+        U.sleep(10); // To ensure timestamp granularity.
+
+        long newIgniteStartedTimestamp = System.currentTimeMillis();
+
+        ignite.cluster().setBaselineTopology(4);
+
+        CacheGroupContext grpCtx = 
newIgnite.cachex(DEFAULT_CACHE_NAME).context().group();
+
+        assertEquals(!disableWalDuringRebalancing, grpCtx.walEnabled());
+
+        U.sleep(10); // To ensure timestamp granularity.
+
+        long rebalanceStartedTimestamp = System.currentTimeMillis();
+
+        for (Ignite g : G.allGrids())
+            g.cache(DEFAULT_CACHE_NAME).rebalance();
+
+        awaitPartitionMapExchange();
+
+        assertTrue(grpCtx.walEnabled());
+
+        U.sleep(10); // To ensure timestamp granularity.
+
+        long rebalanceFinishedTimestamp = System.currentTimeMillis();
+
+        for (Integer k = 0; k < 1000; k++)
+            assertEquals("k=" + k, k, cache.get(k));
+
+        int checkpointsBeforeNodeStarted = 0;
+        int checkpointsBeforeRebalance = 0;
+        int checkpointsAfterRebalance = 0;
+
+        for (Long timestamp : cpHistory.checkpoints()) {
+            if (timestamp < newIgniteStartedTimestamp)
+                checkpointsBeforeNodeStarted++;
+            else if (timestamp >= newIgniteStartedTimestamp && timestamp < 
rebalanceStartedTimestamp)
+                checkpointsBeforeRebalance++;
+            else if (timestamp >= rebalanceStartedTimestamp && timestamp <= 
rebalanceFinishedTimestamp)
+                checkpointsAfterRebalance++;
+        }
+
+        assertEquals(1, checkpointsBeforeNodeStarted); // checkpoint on start
+        assertEquals(0, checkpointsBeforeRebalance);
+        assertEquals(disableWalDuringRebalancing ? 1 : 0, 
checkpointsAfterRebalance); // checkpoint if WAL was re-activated
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalAndGlobalWalStateInterdependence() throws Exception {
+        Ignite ignite = startGrids(3);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for (int k = 0; k < 10_000; k++)
+            cache.put(k, k);
+
+        IgniteEx newIgnite = startGrid(3);
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().nodes());
+
+        CacheGroupContext grpCtx = 
newIgnite.cachex(DEFAULT_CACHE_NAME).context().group();
+
+        assertFalse(grpCtx.walEnabled());
+
+        ignite.cluster().disableWal(DEFAULT_CACHE_NAME);
+
+        for (Ignite g : G.allGrids())
+            g.cache(DEFAULT_CACHE_NAME).rebalance();
+
+        awaitPartitionMapExchange();
+
+        assertFalse(grpCtx.walEnabled()); // WAL is globally disabled
+
+        ignite.cluster().enableWal(DEFAULT_CACHE_NAME);
+
+        assertTrue(grpCtx.walEnabled());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testParallelExchangeDuringRebalance() throws Exception {
+        doTestParallelExchange(supplyMessageLatch);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testParallelExchangeDuringCheckpoint() throws Exception {
+        doTestParallelExchange(fileIOLatch);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestParallelExchange(AtomicReference<CountDownLatch> 
latchRef) throws Exception {
+        Ignite ignite = startGrids(3);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for (int k = 0; k < 10_000; k++)
+            cache.put(k, k);
+
+        IgniteEx newIgnite = startGrid(3);
+
+        CacheGroupContext grpCtx = 
newIgnite.cachex(DEFAULT_CACHE_NAME).context().group();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        latchRef.set(latch);
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().nodes());
+
+        for (Ignite g : G.allGrids())
+            g.cache(DEFAULT_CACHE_NAME).rebalance();
+
+        assertFalse(grpCtx.walEnabled());
+
+        // TODO : test with client node as well
+        startGrid(4); // Trigger exchange
+
+        assertFalse(grpCtx.walEnabled());
+
+        latch.countDown();
+
+        assertFalse(grpCtx.walEnabled());
+
+        for (Ignite g : G.allGrids())
+            g.cache(DEFAULT_CACHE_NAME).rebalance();
+
+        awaitPartitionMapExchange();
+
+        assertTrue(grpCtx.walEnabled());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDataClearedAfterRestartWithDisabledWal() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for (int k = 0; k < 10_000; k++)
+            cache.put(k, k);
+
+        IgniteEx newIgnite = startGrid(1);
+
+        ignite.cluster().setBaselineTopology(2);
+
+        CacheGroupContext grpCtx = 
newIgnite.cachex(DEFAULT_CACHE_NAME).context().group();
+
+        assertFalse(grpCtx.localWalEnabled());
+
+        stopGrid(1);
+        stopGrid(0);
+
+        newIgnite = startGrid(1);
+
+        newIgnite.cluster().active(true);
+
+        newIgnite.cluster().setBaselineTopology(newIgnite.cluster().nodes());
+
+        cache = newIgnite.cache(DEFAULT_CACHE_NAME);
+
+        for (int k = 0; k < 10_000; k++)
+            assertFalse("k=" + k +", v=" + cache.get(k), cache.containsKey(k));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWalNotDisabledAfterShrinkingBaselineTopology() throws 
Exception {
+        Ignite ignite = startGrids(4);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        for (int k = 0; k < 10_000; k++)
+            cache.put(k, k);
+
+        for (Ignite g : G.allGrids()) {
+            CacheGroupContext grpCtx = 
((IgniteEx)g).cachex(DEFAULT_CACHE_NAME).context().group();
+
+            assertTrue(grpCtx.walEnabled());
+        }
+
+        stopGrid(2);
+
+        ignite.cluster().setBaselineTopology(5);
+
+        for (Ignite g : G.allGrids()) {
+            CacheGroupContext grpCtx = 
((IgniteEx)g).cachex(DEFAULT_CACHE_NAME).context().group();
+
+            assertTrue(grpCtx.walEnabled());
+
+            g.cache(DEFAULT_CACHE_NAME).rebalance();
+        }
+
+        awaitPartitionMapExchange();
+
+        for (Ignite g : G.allGrids()) {
+            CacheGroupContext grpCtx = 
((IgniteEx)g).cachex(DEFAULT_CACHE_NAME).context().group();
+
+            assertTrue(grpCtx.walEnabled());
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestFileIOFactory implements FileIOFactory {
+        /** */
+        private final FileIOFactory delegate;
+
+        /**
+         * @param delegate Delegate.
+         */
+        TestFileIOFactory(FileIOFactory delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return new TestFileIO(delegate.create(file));
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws 
IOException {
+            return new TestFileIO(delegate.create(file, modes));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestFileIO implements FileIO {
+        /** */
+        private final FileIO delegate;
+
+        /**
+         * @param delegate Delegate.
+         */
+        TestFileIO(FileIO delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long position() throws IOException {
+            return delegate.position();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void position(long newPosition) throws IOException {
+            delegate.position(newPosition);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(ByteBuffer destBuf) throws IOException {
+            return delegate.read(destBuf);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(ByteBuffer destBuf, long position) throws 
IOException {
+            return delegate.read(destBuf, position);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(byte[] buf, int off, int len) throws 
IOException {
+            return delegate.read(buf, off, len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int write(ByteBuffer srcBuf) throws IOException {
+            CountDownLatch latch = fileIOLatch.get();
+
+            if (latch != null && 
Thread.currentThread().getName().contains("checkpoint"))
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException ex) {
+                    throw new IgniteException(ex);
+                }
+
+            return delegate.write(srcBuf);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int write(ByteBuffer srcBuf, long position) throws 
IOException {
+            CountDownLatch latch = fileIOLatch.get();
+
+            if (latch != null && 
Thread.currentThread().getName().contains("checkpoint"))
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException ex) {
+                    throw new IgniteException(ex);
+                }
+
+            return delegate.write(srcBuf, position);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(byte[] buf, int off, int len) throws 
IOException {
+            CountDownLatch latch = fileIOLatch.get();
+
+            if (latch != null && 
Thread.currentThread().getName().contains("checkpoint"))
+                try {
+                    latch.await();
+                }
+                catch (InterruptedException ex) {
+                    throw new IgniteException(ex);
+                }
+
+            delegate.write(buf, off, len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
+            return delegate.map(maxWalSegmentSize);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void force() throws IOException {
+            delegate.force();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void force(boolean withMetadata) throws IOException {
+            delegate.force(withMetadata);
+        }
+
+        /** {@inheritDoc} */
+        @Override public long size() throws IOException {
+            return delegate.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clear() throws IOException {
+            delegate.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            delegate.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index f955b11..ede537e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchange
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAllBaselineNodesOnlineFullApiSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOfflineBaselineNodeFullApiSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOnlineNodeOutOfBaselineFullApiSelfTest;
@@ -135,5 +136,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         suite.addTestSuite(IgniteCheckpointDirtyPagesForLowLoadTest.class);
 
         suite.addTestSuite(IgnitePdsCorruptedStoreTest.class);
+
+        suite.addTestSuite(LocalWalModeChangeDuringRebalancingSelfTest.class);
     }
 }

Reply via email to