Repository: ignite
Updated Branches:
  refs/heads/ignite-2.4 6173365e6 -> 8484c191e


IGNITE-7514 Choose owner primary node after cluster restart


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8484c191
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8484c191
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8484c191

Branch: refs/heads/ignite-2.4
Commit: 8484c191e34c6e835d9e2e499d76866e0aaf10b9
Parents: 6173365
Author: Ilya Lantukh <[email protected]>
Authored: Wed Jan 24 17:54:09 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Wed Jan 24 17:56:10 2018 +0300

----------------------------------------------------------------------
 .../internal/events/DiscoveryCustomEvent.java   |  34 ++++
 .../cache/CacheAffinitySharedManager.java       |  76 ++++++---
 .../processors/cache/ClusterCachesInfo.java     |   1 +
 .../GridCachePartitionExchangeManager.java      |   9 +-
 .../dht/GridClientPartitionTopology.java        |   9 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   3 -
 .../dht/GridDhtPartitionTopology.java           |  10 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  47 +++++-
 .../GridDhtPartitionsExchangeFuture.java        | 134 +++++++++------
 .../GridCacheDatabaseSharedManager.java         |   4 +
 .../distributed/CacheBaselineTopologyTest.java  | 166 ++++++++++++++++++-
 11 files changed, 408 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
index b3c6a2d..3b12b38 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java
@@ -21,7 +21,10 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Custom event.
@@ -85,4 +88,35 @@ public class DiscoveryCustomEvent extends DiscoveryEvent {
     @Override public String toString() {
         return S.toString(DiscoveryCustomEvent.class, this, super.toString());
     }
+
+    /**
+     * @param evt Discovery event.
+     * @return {@code True} if event is DiscoveryCustomEvent that requires 
centralized affinity assignment.
+     */
+    public static boolean requiresCentralizedAffinityAssignment(DiscoveryEvent 
evt) {
+        if (!(evt instanceof DiscoveryCustomEvent))
+            return false;
+
+        return 
requiresCentralizedAffinityAssignment(((DiscoveryCustomEvent)evt).customMessage());
+    }
+
+    /**
+     * @param msg Discovery custom message.
+     * @return {@code True} if message belongs to event that requires 
centralized affinity assignment.
+     */
+    public static boolean requiresCentralizedAffinityAssignment(@Nullable 
DiscoveryCustomMessage msg) {
+        if (msg == null)
+            return false;
+
+        if (msg instanceof ChangeGlobalStateMessage && 
((ChangeGlobalStateMessage)msg).activate())
+            return true;
+
+        if (msg instanceof SnapshotDiscoveryMessage) {
+            SnapshotDiscoveryMessage snapMsg = (SnapshotDiscoveryMessage) msg;
+
+            return snapMsg.needExchange() && snapMsg.needAssignPartitions();
+        }
+
+        return false;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 63aba98..921701a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -57,7 +58,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
-import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -161,13 +161,15 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         ClusterNode node,
         AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state) {
-        if (state.transition() || !state.active())
+        if ((state.transition() || !state.active()) &&
+            
!DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg))
             return;
 
         if (type == EVT_NODE_JOINED && node.isLocal())
             lastAffVer = null;
 
-        if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == 
EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
+        if ((!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == 
EVT_NODE_JOINED || type == EVT_NODE_LEFT)) ||
+            
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) {
             synchronized (mux) {
                 assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0;
 
@@ -1263,7 +1265,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param fut Current exchange future.
      * @param msg Finish exchange message.
      */
-    public void mergeExchangesOnServerLeft(final 
GridDhtPartitionsExchangeFuture fut,
+    public void applyAffinityFromFullMessage(final 
GridDhtPartitionsExchangeFuture fut,
         final GridDhtPartitionsFullMessage msg) {
         final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 
@@ -1396,7 +1398,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @return Computed difference with ideal affinity.
      * @throws IgniteCheckedException If failed.
      */
-    public  Map<Integer, CacheGroupAffinityMessage> 
onServerLeftWithExchangeMergeProtocol(
+    public Map<Integer, CacheGroupAffinityMessage> 
onServerLeftWithExchangeMergeProtocol(
         final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
     {
         final ExchangeDiscoveryEvents evts = fut.context().events();
@@ -1404,6 +1406,32 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         assert fut.context().mergeExchanges();
         assert evts.hasServerLeft();
 
+        return onReassignmentEnforced(fut);
+    }
+
+    /**
+     * @param fut Current exchange future.
+     * @return Computed difference with ideal affinity.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Map<Integer, CacheGroupAffinityMessage> 
onCustomEventWithEnforcedAffinityReassignment(
+        final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
+    {
+        assert 
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());
+
+        return onReassignmentEnforced(fut);
+    }
+
+    /**
+     * @param fut Current exchange future.
+     * @return Computed difference with ideal affinity.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(
+        final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
+    {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
         forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 AffinityTopologyVersion topVer = evts.topologyVersion();
@@ -1418,7 +1446,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             }
         });
 
-        Map<Integer, Map<Integer, List<Long>>> diff = 
initAffinityOnNodeLeft0(evts.topologyVersion(),
+        Map<Integer, Map<Integer, List<Long>>> diff = 
initAffinityBasedOnPartitionsAvailability(evts.topologyVersion(),
             fut,
             NODE_TO_ORDER,
             true);
@@ -1642,17 +1670,16 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * Called on exchange initiated by server node leave.
+     * Called on exchange initiated by server node leave or custom event with 
centralized affinity assignment.
      *
      * @param fut Exchange future.
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if affinity should be assigned by coordinator.
      */
-    public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, 
boolean crd) throws IgniteCheckedException {
-        ClusterNode leftNode = fut.firstEvent().eventNode();
-
-        assert !leftNode.isClient() : leftNode;
+    public boolean onCentralizedAffinityChange(final 
GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException 
{
+        assert (fut.events().hasServerLeft() && 
!fut.firstEvent().eventNode().isClient()) ||
+            
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()) : 
fut.firstEvent();
 
         if (crd) {
             // Need initialize CacheGroupHolders if this node become 
coordinator on this exchange.
@@ -2066,7 +2093,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> initFut) {
                     try {
-                        
resFut.onDone(initAffinityOnNodeLeft0(fut.initialVersion(), fut, NODE_TO_ID, 
false));
+                        
resFut.onDone(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), 
fut, NODE_TO_ID, false));
                     }
                     catch (IgniteCheckedException e) {
                         resFut.onDone(e);
@@ -2077,7 +2104,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             return resFut;
         }
         else
-            return new 
GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.initialVersion(), fut, 
NODE_TO_ID, false));
+            return new 
GridFinishedFuture<>(initAffinityBasedOnPartitionsAvailability(fut.initialVersion(),
 fut, NODE_TO_ID, false));
     }
 
     /**
@@ -2088,12 +2115,17 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @return Affinity assignment.
      * @throws IgniteCheckedException If failed.
      */
-    private <T> Map<Integer, Map<Integer, List<T>>> 
initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer,
+    private <T> Map<Integer, Map<Integer, List<T>>> 
initAffinityBasedOnPartitionsAvailability(final AffinityTopologyVersion topVer,
         final GridDhtPartitionsExchangeFuture fut,
         final IgniteClosure<ClusterNode, T> c,
         final boolean initAff)
         throws IgniteCheckedException {
-        final WaitRebalanceInfo waitRebalanceInfo = new 
WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
+        final boolean enforcedCentralizedAssignment =
+            
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());
+
+        final WaitRebalanceInfo waitRebalanceInfo = 
enforcedCentralizedAssignment ?
+            new WaitRebalanceInfo(fut.exchangeId().topologyVersion()) :
+            new 
WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
 
         final Collection<ClusterNode> aliveNodes = 
fut.context().events().discoveryCache().serverNodes();
 
@@ -2103,13 +2135,14 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
-                if (!grpHolder.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
+                if (!grpHolder.rebalanceEnabled ||
+                    (fut.cacheGroupAddedOnExchange(desc.groupId(), 
desc.receivedFrom()) && !enforcedCentralizedAssignment))
                     return;
 
                 AffinityTopologyVersion affTopVer = 
grpHolder.affinity().lastVersion();
 
-                assert affTopVer.topologyVersion() > 0 && 
!affTopVer.equals(topVer) : "Invalid affinity version " +
-                    "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + 
desc.cacheOrGroupName() + ']';
+                assert (affTopVer.topologyVersion() > 0 && 
!affTopVer.equals(topVer)) || enforcedCentralizedAssignment :
+                    "Invalid affinity version [last=" + affTopVer + ", 
futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']';
 
                 List<List<ClusterNode>> curAssignment = 
grpHolder.affinity().assignments(affTopVer);
                 List<List<ClusterNode>> newAssignment = 
grpHolder.affinity().idealAssignment();
@@ -2141,6 +2174,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         ", node=" + newPrimary +
                         ", topVer=" + topVer + ']';
 
+                    List<ClusterNode> owners = top.owners(p);
+
+                    if (!owners.isEmpty() && !owners.contains(curPrimary))
+                        curPrimary = owners.get(0);
+
                     if (curPrimary != null && newPrimary != null && 
!curPrimary.equals(newPrimary)) {
                         if (aliveNodes.contains(curPrimary)) {
                             GridDhtPartitionState state = 
top.partitionState(newPrimary.id(), p);
@@ -2173,8 +2211,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                                 }
 
                                 if (newNodes0 == null) {
-                                    List<ClusterNode> owners = top.owners(p);
-
                                     for (ClusterNode owner : owners) {
                                         if (aliveNodes.contains(owner)) {
                                             newNodes0 = 
latePrimaryAssignment(grpHolder.affinity(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 08a910b..2b2fb55 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -1186,6 +1186,7 @@ class ClusterCachesInfo {
     /**
      * @param msg Message.
      * @param topVer Current topology version.
+     * @param curState Current cluster state.
      * @return Exchange action.
      * @throws IgniteCheckedException If configuration validation failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 9b9284f..0a657c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -253,6 +253,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                     return;
                 }
+                if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+                    (((DiscoveryCustomEvent)evt).customMessage() instanceof 
CacheAffinityChangeMessage) &&
+                    
((CacheAffinityChangeMessage)((DiscoveryCustomEvent)evt).customMessage()).exchangeId()
 != null) {
+                    onDiscoveryEvent(evt, cache);
+
+                    return;
+                }
 
                 if (cache.state().transition()) {
                     if (log.isDebugEnabled())
@@ -965,7 +972,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * For coordinator causes {@link GridDhtPartitionsFullMessage 
FullMessages} send,
      * for non coordinator -  {@link GridDhtPartitionsSingleMessage 
SingleMessages} send
      */
-    private void refreshPartitions() {
+    public void refreshPartitions() {
         // TODO https://issues.apache.org/jira/browse/IGNITE-6857
         if (cctx.snapshot().snapshotOperationInProgress()) {
             scheduleResendPartitions();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index e994113..def00f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -256,9 +256,9 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+    @Override public boolean 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
         GridDhtPartitionsExchangeFuture exchFut) {
-        // No-op.
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -383,6 +383,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void afterStateRestored(AffinityTopologyVersion topVer) {
+        // no-op
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) throws IgniteCheckedException {
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index e1f1d6f..e63aab6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -213,9 +213,6 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             // TODO ignite-db
             throw new IgniteException(e);
         }
-
-        // Todo log moving state
-        casState(state.get(), MOVING);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4ae68ef..13564c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -117,12 +117,20 @@ public interface GridDhtPartitionTopology {
     /**
      * @param affVer Affinity version.
      * @param exchFut Exchange future.
+     * @return {@code True} if partitions must be refreshed.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    public void initPartitionsWhenAffinityReady(AffinityTopologyVersion 
affVer, GridDhtPartitionsExchangeFuture exchFut)
+    public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion 
affVer, GridDhtPartitionsExchangeFuture exchFut)
         throws IgniteInterruptedCheckedException;
 
     /**
+     * Initializes local data structures after partitions are restored from 
persistence.
+     *
+     * @param topVer Topology version.
+     */
+    public void afterStateRestored(AffinityTopologyVersion topVer);
+
+    /**
      * Post-initializes this topology.
      *
      * @param exchFut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0a2c154..f3da42a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -299,10 +299,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
+    @Override public boolean 
initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
         GridDhtPartitionsExchangeFuture exchFut)
         throws IgniteInterruptedCheckedException
     {
+        boolean needRefresh;
+
         ctx.database().checkpointReadLock();
 
         try {
@@ -310,11 +312,11 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             try {
                 if (stopping)
-                    return;
+                    return false;
 
                 long updateSeq = this.updateSeq.incrementAndGet();
 
-                initPartitions0(affVer, exchFut, updateSeq);
+                needRefresh = initPartitions0(affVer, exchFut, updateSeq);
 
                 consistencyCheck();
             }
@@ -325,16 +327,21 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         finally {
             ctx.database().checkpointReadUnlock();
         }
+
+        return needRefresh;
     }
 
     /**
      * @param affVer Affinity version to use.
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
+     * @return {@code True} if partitions must be refreshed.
      */
-    private void initPartitions0(AffinityTopologyVersion affVer, 
GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
+    private boolean initPartitions0(AffinityTopologyVersion affVer, 
GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
         List<List<ClusterNode>> aff = grp.affinity().readyAssignments(affVer);
 
+        boolean needRefresh = false;
+
         if (grp.affinityNode()) {
             ClusterNode loc = ctx.localNode();
 
@@ -366,6 +373,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)grp.shared().database();
 
                                 
locPart.restoreState(db.readPartitionState(grp, locPart.id()));
+
+                                needRefresh = true;
                             }
                             else {
                                 boolean owned = locPart.own();
@@ -423,6 +432,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         }
 
         updateRebalanceVersion(aff);
+
+        return needRefresh;
     }
 
     /**
@@ -617,6 +628,30 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void afterStateRestored(AffinityTopologyVersion topVer) {
+        lock.writeLock().lock();
+
+        try {
+            if (node2part == null)
+                return;
+
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            for (int p = 0; p < grp.affinity().partitions(); p++) {
+                GridDhtLocalPartition locPart = locParts.get(p);
+
+                if (locPart == null)
+                    updateLocal(p, EVICTED, updateSeq, topVer);
+                else
+                    updateLocal(p, locPart.state(), updateSeq, topVer);
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) {
         boolean changed = false;
 
@@ -996,9 +1031,11 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 map.put(i, part.state());
             }
 
+            GridDhtPartitionMap locPartMap = node2part != null ? 
node2part.get(ctx.localNodeId()) : null;
+
             return new GridDhtPartitionMap(ctx.localNodeId(),
                 updateSeq.get(),
-                readyTopVer,
+                locPartMap != null ? locPartMap.topologyVersion() : 
readyTopVer,
                 map,
                 true);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6c09b6a..9a247e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -227,6 +227,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private boolean centralizedAff;
 
+    /**
+     * Enforce affinity reassignment based on actual partition distribution. 
This mode should be used when partitions
+     * might be distributed not according to affinity assignment.
+     */
+    private boolean forceAffReassignment;
+
     /** Change global state exception. */
     private Exception changeGlobalStateE;
 
@@ -590,6 +596,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 DiscoveryCustomMessage msg = 
((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
 
+                forceAffReassignment = 
DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(msg);
+
                 if (msg instanceof ChangeGlobalStateMessage) {
                     assert exchActions != null && !exchActions.empty();
 
@@ -611,6 +619,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     exchange = onAffinityChangeRequest(crdNode);
                 }
 
+                if (forceAffReassignment)
+                    cctx.affinity().onCentralizedAffinityChange(this, crdNode);
+
                 initCoordinatorCaches(newCrd);
             }
             else {
@@ -756,7 +767,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     if (grp.isLocal())
                         continue;
 
-                    grp.topology().beforeExchange(this, !centralizedAff, 
false);
+                    grp.topology().beforeExchange(this, !centralizedAff && 
!forceAffReassignment, false);
                 }
             }
         }
@@ -899,8 +910,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         else if (req.activate()) {
             // TODO: BLT changes on inactive cluster can't be handled easily 
because persistent storage hasn't been initialized yet.
             try {
-                cctx.affinity().onBaselineTopologyChanged(this, crd);
-
                 if (CU.isPersistenceEnabled(cctx.kernalContext().config()) && 
!cctx.kernalContext().clientNode())
                     
cctx.kernalContext().state().onBaselineTopologyChanged(req.baselineTopology(),
                         req.prevBaselineTopologyHistoryItem());
@@ -937,7 +946,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @return Exchange type.
      */
     private ExchangeType onCustomMessageNoAffinityChange(boolean crd) {
-        cctx.affinity().onCustomMessageNoAffinityChange(this, crd, 
exchActions);
+        if (!forceAffReassignment)
+            cctx.affinity().onCustomMessageNoAffinityChange(this, crd, 
exchActions);
 
         return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : 
ExchangeType.ALL;
     }
@@ -992,7 +1002,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             exchCtx.events().warnNoAffinityNodes(cctx);
 
-            centralizedAff = cctx.affinity().onServerLeft(this, crd);
+            centralizedAff = cctx.affinity().onCentralizedAffinityChange(this, 
crd);
         }
         else
             cctx.affinity().onServerJoin(this, crd);
@@ -1072,7 +1082,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 // It is possible affinity is not initialized yet if node 
joins to cluster.
                 if (grp.affinity().lastVersion().topologyVersion() > 0)
-                    grp.topology().beforeExchange(this, !centralizedAff, 
false);
+                    grp.topology().beforeExchange(this, !centralizedAff && 
!forceAffReassignment, false);
             }
         }
 
@@ -1509,19 +1519,24 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
 
         if (err == null) {
-            if (centralizedAff) {
+            if (centralizedAff || forceAffReassignment) {
                 assert !exchCtx.mergeExchanges();
 
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal())
                         continue;
 
+                    boolean needRefresh = false;
+
                     try {
-                        grp.topology().initPartitionsWhenAffinityReady(res, 
this);
+                        needRefresh = 
grp.topology().initPartitionsWhenAffinityReady(res, this);
                     }
                     catch (IgniteInterruptedCheckedException e) {
                         U.error(log, "Failed to initialize partitions.", e);
                     }
+
+                    if (needRefresh)
+                        cctx.exchange().refreshPartitions();
                 }
             }
 
@@ -2317,7 +2332,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (!exchCtx.mergeExchanges() && 
!crd.equals(events().discoveryCache().serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (!grp.isLocal())
-                        grp.topology().beforeExchange(this, !centralizedAff, 
false);
+                        grp.topology().beforeExchange(this, !centralizedAff && 
!forceAffReassignment, false);
                 }
             }
 
@@ -2449,6 +2464,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     detectLostPartitions(resTopVer);
             }
 
+            if (!exchCtx.mergeExchanges() && forceAffReassignment)
+                idealAffDiff = 
cctx.affinity().onCustomEventWithEnforcedAffinityReassignment(this);
+
             for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
                 if (!grpCtx.isLocal())
                     grpCtx.topology().applyUpdateCounters();
@@ -2471,6 +2489,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (exchCtx.events().hasServerLeft())
                     msg.idealAffinityDiff(idealAffDiff);
             }
+            else if (forceAffReassignment)
+                msg.idealAffinityDiff(idealAffDiff);
 
             msg.prepareMarshal(cctx);
 
@@ -2524,65 +2544,69 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         nodes.addAll(sndResNodes);
                 }
 
-                IgniteCheckedException err = null;
+                if (!nodes.isEmpty())
+                    sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, 
joinedNodeAff);
 
-                if (stateChangeExchange()) {
-                    StateChangeRequest req = exchActions.stateChangeRequest();
+                if (!stateChangeExchange())
+                    onDone(exchCtx.events().topologyVersion(), null);
 
-                    assert req != null : exchActions;
+                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
pendingSingleMsgs.entrySet()) {
+                    if (log.isInfoEnabled()) {
+                        log.info("Process pending message on coordinator 
[node=" + e.getKey() +
+                            ", ver=" + initialVersion() +
+                            ", resVer=" + resTopVer + ']');
+                    }
 
-                    boolean stateChangeErr = false;
+                    processSingleMessage(e.getKey(), e.getValue());
+                }
+            }
 
-                    if (!F.isEmpty(changeGlobalStateExceptions)) {
-                        stateChangeErr = true;
+            if (stateChangeExchange()) {
+                IgniteCheckedException err = null;
 
-                        err = new IgniteCheckedException("Cluster state change 
failed.");
+                StateChangeRequest req = exchActions.stateChangeRequest();
 
-                        
cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, 
req);
-                    }
-                    else {
-                        boolean hasMoving = !partsToReload.isEmpty();
+                assert req != null : exchActions;
 
-                        Set<Integer> waitGrps = cctx.affinity().waitGroups();
+                boolean stateChangeErr = false;
 
-                        if (!hasMoving) {
-                            for (CacheGroupContext grpCtx : 
cctx.cache().cacheGroups()) {
-                                if (waitGrps.contains(grpCtx.groupId()) && 
grpCtx.topology().hasMovingPartitions()) {
-                                    hasMoving = true;
+                if (!F.isEmpty(changeGlobalStateExceptions)) {
+                    stateChangeErr = true;
 
-                                    break;
-                                }
+                    err = new IgniteCheckedException("Cluster state change 
failed.");
 
-                            }
-                        }
+                    
cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, 
req);
+                }
+                else {
+                    boolean hasMoving = !partsToReload.isEmpty();
 
-                        
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
-                    }
+                    Set<Integer> waitGrps = cctx.affinity().waitGroups();
+
+                    if (!hasMoving) {
+                        for (CacheGroupContext grpCtx : 
cctx.cache().cacheGroups()) {
+                            if (waitGrps.contains(grpCtx.groupId()) && 
grpCtx.topology().hasMovingPartitions()) {
+                                hasMoving = true;
 
-                    boolean active = !stateChangeErr && req.activate();
+                                break;
+                            }
 
-                    ChangeGlobalStateFinishMessage stateFinishMsg = new 
ChangeGlobalStateFinishMessage(
-                        req.requestId(),
-                        active,
-                        !stateChangeErr);
+                        }
+                    }
 
-                    cctx.discovery().sendCustomEvent(stateFinishMsg);
+                    
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
                 }
 
-                if (!nodes.isEmpty())
-                    sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, 
joinedNodeAff);
+                boolean active = !stateChangeErr && req.activate();
 
-                onDone(exchCtx.events().topologyVersion(), err);
+                ChangeGlobalStateFinishMessage stateFinishMsg = new 
ChangeGlobalStateFinishMessage(
+                    req.requestId(),
+                    active,
+                    !stateChangeErr);
 
-                for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
pendingSingleMsgs.entrySet()) {
-                    if (log.isInfoEnabled()) {
-                        log.info("Process pending message on coordinator 
[node=" + e.getKey() +
-                            ", ver=" + initialVersion() +
-                            ", resVer=" + resTopVer + ']');
-                    }
+                cctx.discovery().sendCustomEvent(stateFinishMsg);
 
-                    processSingleMessage(e.getKey(), e.getValue());
-                }
+                if (!centralizedAff)
+                    onDone(exchCtx.events().topologyVersion(), err);
             }
         }
         catch (IgniteCheckedException e) {
@@ -2925,7 +2949,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     cctx.affinity().onLocalJoin(this, msg, resTopVer);
                 else {
                     if (exchCtx.events().hasServerLeft())
-                        cctx.affinity().mergeExchangesOnServerLeft(this, msg);
+                        cctx.affinity().applyAffinityFromFullMessage(this, 
msg);
                     else
                         
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, false);
 
@@ -2939,6 +2963,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
             else if (localJoinExchange() && !exchCtx.fetchAffinityOnJoin())
                 cctx.affinity().onLocalJoin(this, msg, resTopVer);
+            else if (forceAffReassignment)
+                cctx.affinity().applyAffinityFromFullMessage(this, msg);
 
             updatePartitionFullMap(resTopVer, msg);
 
@@ -3048,6 +3074,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                             crd.isLocal(),
                             msg);
 
+                        IgniteCheckedException err = 
!F.isEmpty(msg.partitionsMessage().getErrorsMap()) ?
+                            new IgniteCheckedException("Cluster state change 
failed.") : null;
+
                         if (!crd.isLocal()) {
                             GridDhtPartitionsFullMessage partsMsg = 
msg.partitionsMessage();
 
@@ -3055,9 +3084,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                             assert partsMsg.lastVersion() != null : partsMsg;
 
                             updatePartitionFullMap(resTopVer, partsMsg);
+
+                            if (exchActions != null && 
exchActions.stateChangeRequest() != null && err != null)
+                                
cctx.kernalContext().state().onStateChangeError(msg.partitionsMessage().getErrorsMap(),
 exchActions.stateChangeRequest());
                         }
 
-                        onDone(resTopVer);
+                        onDone(resTopVer, err);
                     }
                     else {
                         if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a636464..b43827a 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -109,6 +109,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -2300,6 +2302,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     updateState(part, restore.get1());
                 }
             }
+
+            
grp.topology().afterStateRestored(grp.topology().lastTopologyChangeVersion());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8484c191/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index 5fbd752..7dc1bfa 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -25,13 +25,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -40,9 +43,16 @@ import 
org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestDelayingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -63,6 +73,9 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
     /** */
     private static final int NODE_COUNT = 4;
 
+    /** */
+    private static boolean delayRebalance = false;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -98,6 +111,9 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
         if (client)
             cfg.setClientMode(true);
 
+        if (delayRebalance)
+            cfg.setCommunicationSpi(new DelayRebalanceCommunicationSpi());
+
         return cfg;
     }
 
@@ -646,6 +662,83 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @throws Exception if failed.
+     */
+    public void testAffinityAssignmentChangedAfterRestart() throws Exception {
+        delayRebalance = false;
+
+        int parts = 32;
+
+        final List<Integer> partMapping = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++)
+            partMapping.add(p);
+
+        final AffinityFunction affFunc = new TestAffinityFunction(new 
RendezvousAffinityFunction(false, parts));
+
+        TestAffinityFunction.partsAffMapping = partMapping;
+
+        String cacheName = CACHE_NAME + 2;
+
+        startGrids(4);
+
+        IgniteEx ig = grid(0);
+
+        ig.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ig.createCache(
+            new CacheConfiguration<Integer, Integer>()
+                .setName(cacheName)
+                .setCacheMode(PARTITIONED)
+                .setBackups(1)
+                .setPartitionLossPolicy(READ_ONLY_SAFE)
+                .setReadFromBackup(true)
+                .setWriteSynchronizationMode(FULL_SYNC)
+                .setRebalanceDelay(-1)
+                .setAffinity(affFunc));
+
+        Map<Integer, String> keyToConsId = new HashMap<>();
+
+        for (int k = 0; k < 1000; k++) {
+            cache.put(k, k);
+
+            keyToConsId.put(k, 
ig.affinity(cacheName).mapKeyToNode(k).consistentId().toString());
+        }
+
+        stopAllGrids();
+
+        Collections.shuffle(TestAffinityFunction.partsAffMapping, new 
Random(1));
+
+        delayRebalance = true;
+
+        startGrids(4);
+
+        ig = grid(0);
+
+        ig.active(true);
+
+        cache = ig.cache(cacheName);
+
+        GridDhtPartitionFullMap partMap = 
ig.cachex(cacheName).context().topology().partitionMap(false);
+
+        for (int i = 1; i < 4; i++) {
+            IgniteEx ig0 = grid(i);
+
+            for (int p = 0; p < 32; p++)
+                
assertEqualsCollections(ig.affinity(cacheName).mapPartitionToPrimaryAndBackups(p),
 ig0.affinity(cacheName).mapPartitionToPrimaryAndBackups(p));
+        }
+
+        for (Map.Entry<Integer, String> e : keyToConsId.entrySet()) {
+            int p = ig.affinity(cacheName).partition(e.getKey());
+
+            assertEquals("p=" + p, GridDhtPartitionState.OWNING, 
partMap.get(ig.affinity(cacheName).mapKeyToNode(e.getKey()).id()).get(p));
+        }
+
+        for (int k = 0; k < 1000; k++)
+            assertEquals("k=" + k, Integer.valueOf(k), cache.get(k));
+    }
+
     /** */
     private Collection<BaselineNode> baselineNodes(Collection<ClusterNode> 
clNodes) {
         Collection<BaselineNode> res = new ArrayList<>(clNodes.size());
@@ -708,4 +801,73 @@ public class CacheBaselineTopologyTest extends 
GridCommonAbstractTest {
             return result;
         }
     }
+
+    /**
+     *
+     */
+    private static class TestAffinityFunction implements AffinityFunction {
+        /** */
+        private final AffinityFunction delegate;
+
+        /** */
+        private static List<Integer> partsAffMapping;
+
+        /** */
+        public TestAffinityFunction(AffinityFunction delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            delegate.reset();;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return delegate.partitions();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return delegate.partition(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> 
assignPartitions(AffinityFunctionContext affCtx) {
+            List<List<ClusterNode>> res0 = delegate.assignPartitions(affCtx);
+
+            List<List<ClusterNode>> res = new ArrayList<>(res0.size());
+
+            for (int p = 0; p < res0.size(); p++)
+                res.add(p, null);
+
+            for (int p = 0; p < res0.size(); p++)
+                res.set(partsAffMapping.get(p), res0.get(p));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            delegate.removeNode(nodeId);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class DelayRebalanceCommunicationSpi extends 
TestDelayingCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override protected boolean delayMessage(Message msg, GridIoMessage 
ioMsg) {
+            if (msg != null && (msg instanceof GridDhtPartitionDemandMessage 
|| msg instanceof GridDhtPartitionSupplyMessage))
+                return true;
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int delayMillis() {
+            return 1_000_000;
+        }
+    }
 }

Reply via email to