IGNITE-3477 - Remove waitForRent() in exchange future

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e956ed0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e956ed0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e956ed0

Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: 7e956ed07c70c6d4de0ccb60a222529ecef3ea9b
Parents: e4b05d5
Author: Alexey Goncharuk <[email protected]>
Authored: Thu Mar 9 15:09:42 2017 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Thu Mar 9 15:09:42 2017 +0300

----------------------------------------------------------------------
 .../pagemem/wal/IgniteWriteAheadLogManager.java |  27 ++-
 .../GridCachePartitionExchangeManager.java      |  27 ++-
 .../cache/IgniteCacheOffheapManager.java        |   3 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  57 +++--
 .../IgniteCacheDatabaseSharedManager.java       |  44 +++-
 .../dht/GridClientPartitionTopology.java        |  33 ++-
 .../distributed/dht/GridDhtCacheEntry.java      |   2 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  60 +++---
 .../distributed/dht/GridDhtLockFuture.java      |  29 ++-
 .../dht/GridDhtPartitionTopology.java           |  22 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 215 +++++++++----------
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   2 +-
 .../GridDhtPartitionDemandMessage.java          |  73 +++++--
 .../dht/preloader/GridDhtPartitionSupplier.java |  10 +-
 .../GridDhtPartitionSupplyMessageV2.java        |   1 -
 .../GridDhtPartitionsExchangeFuture.java        | 153 +++++++++++--
 .../preloader/GridDhtPartitionsFullMessage.java | 120 +++++++++--
 .../GridDhtPartitionsSingleMessage.java         |  77 ++++++-
 .../dht/preloader/GridDhtPreloader.java         | 106 ++++++---
 .../IgniteDhtPartitionCountersMap.java          |  64 ++++++
 .../IgniteDhtPartitionHistorySuppliersMap.java  | 107 +++++++++
 .../IgniteDhtPartitionsToReloadMap.java         |  88 ++++++++
 22 files changed, 1043 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index fb7a39b..ac785b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -59,8 +59,9 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
      * the underlying storage.
      *
      * @param ptr Optional pointer to sync. If {@code null}, will sync up to 
the latest record.
-     * @throws IgniteCheckedException If
-     * @throws StorageException
+     * @throws IgniteCheckedException If failed to fsync.
+     * @throws StorageException If IO exception occurred during the write. If 
an exception is thrown from this
+     *      method, the WAL will be invalidated and the node will be stopped.
      */
     public void fsync(WALPointer ptr) throws IgniteCheckedException, 
StorageException;
 
@@ -75,6 +76,22 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
     public WALIterator replay(WALPointer start) throws IgniteCheckedException, 
StorageException;
 
     /**
+     * Invoke this method to reserve WAL history since provided pointer and 
prevent it's deletion.
+     *
+     * @param start WAL pointer.
+     * @throws IgniteException If failed to reserve.
+     */
+    public boolean reserve(WALPointer start) throws IgniteCheckedException;
+
+    /**
+     * Invoke this method to release WAL history since provided pointer that 
was previously reserved.
+     *
+     * @param start WAL pointer.
+     * @throws IgniteException If failed to release.
+     */
+    public void release(WALPointer start) throws IgniteCheckedException;
+
+    /**
      * Gives a hint to WAL manager to clear entries logged before the given 
pointer. Some entries before the
      * the given pointer will be kept because there is a configurable WAL 
history size. Those entries may be used
      * for partial partition rebalancing.
@@ -83,4 +100,10 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
      * @return Number of deleted WAL segments.
      */
     public int truncate(WALPointer ptr);
+
+    /**
+     * @param ptr Pointer.
+     * @return True if given pointer is located in reserved segment.
+     */
+    public boolean reserved(WALPointer ptr);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 232dc83..695872f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -73,6 +73,8 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -822,7 +824,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return {@code True} if message was sent, {@code false} if node left 
grid.
      */
     private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
-        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, 
null, null, true);
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, 
null, null, null, null, true);
 
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + 
", msg=" + m + ']');
@@ -854,12 +856,17 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return Message.
      */
     public GridDhtPartitionsFullMessage 
createPartitionsFullMessage(Collection<ClusterNode> nodes,
-        final @Nullable GridDhtPartitionExchangeId exchId,
+        @Nullable final GridDhtPartitionExchangeId exchId,
         @Nullable GridCacheVersion lastVer,
+        @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+        @Nullable IgniteDhtPartitionsToReloadMap partsToReload,
         final boolean compress) {
         final GridDhtPartitionsFullMessage m = new 
GridDhtPartitionsFullMessage(exchId,
             lastVer,
-            exchId != null ? exchId.topologyVersion() : 
AffinityTopologyVersion.NONE);
+            exchId != null ? exchId.topologyVersion() : 
AffinityTopologyVersion.NONE,
+            partHistSuppliers,
+            partsToReload
+            );
 
         m.compress(compress);
 
@@ -1239,11 +1246,23 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue(), null) != 
null;
+                        updated |= top.update(null, entry.getValue(), null, 
msg.partsToReload(cctx.localNodeId(), cacheId)) != null;
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
                     refreshPartitions();
+
+                boolean hasMovingParts = false;
+
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    if (!cacheCtx.isLocal() && cacheCtx.started() && 
cacheCtx.topology().hasMovingPartitions()) {
+                        hasMovingParts = true;
+                        break;
+                    }
+                }
+
+                if (!hasMovingParts)
+                    cctx.database().releaseHistoryForPreloading();
             }
             else
                 exchangeFuture(msg.exchangeId(), null, null, 
null).onReceive(node, msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 4b85abe..d1ab787 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -82,10 +82,9 @@ public interface IgniteCacheOffheapManager extends 
GridCacheManager {
     public Iterable<CacheDataStore> cacheDataStores();
 
     /**
-     * @param p Partition ID.
      * @param store Data store.
      */
-    public void destroyCacheDataStore(int p, CacheDataStore store) throws 
IgniteCheckedException;
+    public void destroyCacheDataStore(CacheDataStore store) throws 
IgniteCheckedException;
 
     /**
      * TODO: GG-10884, used on only from initialValue.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 05eaf0a..5d1f7b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.GridStripedLock;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
@@ -92,9 +93,6 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
     protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = 
new ConcurrentHashMap<>();
 
     /** */
-    protected final CacheDataStore rmvdStore = new CacheDataStoreImpl(-1, 
null, null, null);
-
-    /** */
     protected PendingEntriesTree pendingEntries;
 
     /** */
@@ -112,6 +110,9 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
     /** */
     private int updateValSizeThreshold;
 
+    /** */
+    private GridStripedLock partStoreLock = new 
GridStripedLock(Runtime.getRuntime().availableProcessors());
+
     /** {@inheritDoc} */
     @Override public GridAtomicLong globalRemoveId() {
         return globalRmvId;
@@ -172,7 +173,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
         super.stop0(cancel, destroy);
 
         if (destroy && cctx.affinityNode())
-            destroyCacheDataStructures(destroy);
+            destroyCacheDataStructures();
     }
 
     /** {@inheritDoc} */
@@ -185,7 +186,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
     /**
      *
      */
-    protected void destroyCacheDataStructures(boolean destroy) {
+    protected void destroyCacheDataStructures() {
         assert cctx.affinityNode();
 
         try {
@@ -196,7 +197,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
                 pendingEntries.destroy();
 
             for (CacheDataStore store : partDataStores.values())
-                store.destroy();
+                destroyCacheDataStore(store);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e.getMessage(), e);
@@ -238,7 +239,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
         if (cctx.isLocal())
             return locCacheDataStore;
         else {
-            GridDhtLocalPartition part = cctx.topology().localPartition(p, 
AffinityTopologyVersion.NONE, false);
+            GridDhtLocalPartition part = cctx.topology().localPartition(p, 
AffinityTopologyVersion.NONE, false, true);
 
             return part != null ? part.dataStore() : null;
         }
@@ -278,7 +279,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
 
     /** {@inheritDoc} */
     @Override public long entriesCount(int part) {
-        if (cctx.isLocal()){
+        if (cctx.isLocal()) {
             assert part == 0;
 
             return locCacheDataStore.size();
@@ -709,15 +710,20 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
 
     /** {@inheritDoc} */
     @Override public final CacheDataStore createCacheDataStore(int p) throws 
IgniteCheckedException {
-        CacheDataStore dataStore = null;
-        CacheDataStore oldStore = null;
+        CacheDataStore dataStore;
+
+        partStoreLock.lock(p);
+
+        try {
+            assert !partDataStores.containsKey(p);
 
-        do {
             dataStore = createCacheDataStore0(p);
 
-            oldStore = partDataStores.putIfAbsent(p, dataStore);
+            partDataStores.put(p, dataStore);
+        }
+        finally {
+            partStoreLock.unlock(p);
         }
-        while (oldStore != null);
 
         return dataStore;
     }
@@ -763,15 +769,32 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
     }
 
     /** {@inheritDoc} */
-    @Override public void destroyCacheDataStore(int p, CacheDataStore store) 
throws IgniteCheckedException {
+    @Override public final void destroyCacheDataStore(CacheDataStore store) 
throws IgniteCheckedException {
+        int p = store.partId();
+
+        partStoreLock.lock(p);
+
         try {
-            partDataStores.remove(p, store);
+            boolean removed = partDataStores.remove(p, store);
+
+            assert removed;
 
-            store.destroy();
+            destroyCacheDataStore0(store);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
         }
+        finally {
+            partStoreLock.unlock(p);
+        }
+    }
+
+    /**
+     * @param store Cache data store.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected void destroyCacheDataStore0(CacheDataStore store) throws 
IgniteCheckedException {
+        store.destroy();
     }
 
     /**
@@ -1155,7 +1178,7 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
             KeyCacheObject key,
             CacheObject oldVal,
             CacheObject newVal
-            ) throws IgniteCheckedException {
+        ) {
             // In case we deal with IGFS cache, count updated data
             if (cctx.cache().isIgfsDataCache() &&
                 !cctx.isNear() &&

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
index 95ed3ae..743cd77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
@@ -19,9 +19,10 @@ package org.apache.ignite.internal.processors.cache.database;
 
 import java.io.File;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -95,7 +96,7 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
     /**
      * @throws IgniteCheckedException If failed.
      */
-    public void initDataBase() throws IgniteCheckedException{
+    public void initDataBase() throws IgniteCheckedException {
         // No-op.
     }
 
@@ -152,7 +153,7 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
     /**
      *
      */
-    public void unLock(){
+    public void unLock() {
 
     }
 
@@ -212,7 +213,8 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
      * @return Snapshot creation init future or {@code null} if snapshot is 
not available.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public IgniteInternalFuture 
startLocalSnapshotOperation(StartSnapshotOperationAckDiscoveryMessage 
snapshotMsg)
+    @Nullable public IgniteInternalFuture startLocalSnapshotOperation(
+        StartSnapshotOperationAckDiscoveryMessage snapshotMsg)
         throws IgniteCheckedException {
         return null;
     }
@@ -225,6 +227,40 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
     }
 
     /**
+     * Reserve update history for exchange.
+     *
+     * @return Reserved update counters per cache and partition.
+     */
+    public Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Release reserved update history.
+     */
+    public void releaseHistoryForExchange() {
+        // No-op
+    }
+
+    /**
+     * Reserve update history for preloading.
+     * @param cacheId Cache ID.
+     * @param partId Partition Id.
+     * @param cntr Update counter.
+     * @return True if successfully reserved.
+     */
+    public boolean reserveHistoryForPreloading(int cacheId, int partId, long 
cntr) {
+        return false;
+    }
+
+    /**
+     * Release reserved update history.
+     */
+    public void releaseHistoryForPreloading() {
+        // No-op
+    }
+
+    /**
      * @param dbCfg Database configuration.
      * @return Page memory instance.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index ca71f51..e63379a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +52,7 @@ import org.jetbrains.annotations.Nullable;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
 
 /**
  * Partition topology for node which does not have any local partitions.
@@ -353,6 +355,12 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer,
+        boolean create, boolean showRenting) throws 
GridDhtInvalidPartitionException {
+        return localPartition(p, topVer, create);
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtLocalPartition localPartition(Object key, boolean 
create) {
         return localPartition(1, AffinityTopologyVersion.NONE, create);
     }
@@ -550,9 +558,11 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable 
GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionFullMap partMap,
-        Map<Integer, T2<Long, Long>> cntrMap) {
+    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable 
GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtPartitionFullMap partMap, Map<Integer, T2<Long, Long>> cntrMap, 
Set<Integer> partsToReload) {
+
+        GridDhtPartitionExchangeId exchId = exchFut != null ? 
exchFut.exchangeId() : null;
+
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", 
parts=" + fullMapString() + ']');
 
@@ -910,7 +920,9 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void setOwners(int p, Set<UUID> owners, boolean 
updateSeq) {
+    @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean 
haveHistory, boolean updateSeq) {
+        Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new 
HashSet<UUID>();
+
         lock.writeLock().lock();
 
         try {
@@ -918,8 +930,15 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
                 if (!e.getValue().containsKey(p))
                     continue;
 
-                if (e.getValue().get(p) == OWNING && 
!owners.contains(e.getKey()))
-                    e.getValue().put(p, MOVING);
+                if (e.getValue().get(p) == OWNING && 
!owners.contains(e.getKey())) {
+                    if (haveHistory)
+                        e.getValue().put(p, MOVING);
+                    else {
+                        e.getValue().put(p, RENTING);
+
+                        result.add(e.getKey());
+                    }
+                }
                 else if (owners.contains(e.getKey()))
                     e.getValue().put(p, OWNING);
             }
@@ -932,6 +951,8 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         finally {
             lock.writeLock().unlock();
         }
+
+        return result;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 5b5cc3b..f1f4376 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -91,7 +91,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
         // Record this entry with partition.
         int p = cctx.affinity().partition(key);
 
-        locPart = ctx.topology().localPartition(p, topVer, true);
+        locPart = ctx.topology().localPartition(p, topVer, true, true);
 
         assert locPart != null : p;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4418f53..aeb40f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -138,13 +138,16 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      * reservation is released. */
     private volatile boolean shouldBeRenting;
 
+    /** Set if partition must be re-created and preloaded after eviction. */
+    private boolean reload;
+
     /**
      * @param cctx Context.
      * @param id Partition ID.
      * @param entryFactory Entry factory.
      */
-    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
-    GridDhtLocalPartition(GridCacheContext cctx, int id, 
GridCacheMapEntryFactory entryFactory) {
+    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") 
GridDhtLocalPartition(GridCacheContext cctx,
+        int id, GridCacheMapEntryFactory entryFactory) {
         assert cctx != null;
 
         this.id = id;
@@ -373,12 +376,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
         map.removeEntry(entry);
 
         // Attempt to evict.
-        try {
-            tryEvict();
-        }
-        catch (NodeStoppingException ignore) {
-            // No-op.
-        }
+        tryEvictAsync(false);
     }
 
     /**
@@ -514,12 +512,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
                 if ((reservations & 0xFFFF) == 0 && shouldBeRenting)
                     rent(true);
 
-                try {
-                    tryEvict();
-                }
-                catch (NodeStoppingException ignore) {
-                    // No-op.
-                }
+                tryEvictAsync(false);
 
                 break;
             }
@@ -530,7 +523,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      * @param stateToRestore State to restore.
      */
     public void restoreState(GridDhtPartitionState stateToRestore) {
-        state.set(((long)stateToRestore.ordinal())  <<  32);
+        state.set(((long)stateToRestore.ordinal()) << 32);
     }
 
     /**
@@ -632,10 +625,24 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
     }
 
     /**
+     * @return {@code True} if partition should be re-created after it is 
cleared.
+     */
+    public boolean reload() {
+        return reload;
+    }
+
+    /**
+     * @param value {@code reload} flag value.
+     */
+    public void reload(boolean value) {
+        reload = value;
+    }
+
+    /**
      * @param updateSeq Update sequence.
      * @return Future to signal that this node is no longer an owner or backup.
      */
-    IgniteInternalFuture<?> rent(boolean updateSeq) {
+    public IgniteInternalFuture<?> rent(boolean updateSeq) {
         long reservations = state.get();
 
         int ord = (int)(reservations >> 32);
@@ -663,8 +670,6 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      * @param updateSeq Update sequence.
      */
     void tryEvictAsync(boolean updateSeq) {
-        assert cctx.kernalContext().state().active();
-
         long reservations = state.get();
 
         int ord = (int)(reservations >> 32);
@@ -713,7 +718,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      *
      */
     private void clearEvicting() {
-       boolean free;
+        boolean free;
 
         while (true) {
             int cnt = evictGuard.get();
@@ -804,9 +809,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      */
     private void destroyCacheDataStore() {
         try {
-            CacheDataStore store = dataStore();
-
-            cctx.offheap().destroyCacheDataStore(id, store);
+            cctx.offheap().destroyCacheDataStore(dataStore());
         }
         catch (IgniteCheckedException e) {
             log.error("Unable to destroy cache data store on partition 
eviction [id=" + id + "]", e);
@@ -817,12 +820,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      *
      */
     void onUnlock() {
-        try {
-            tryEvict();
-        }
-        catch (NodeStoppingException ignore) {
-            // No-op.
-        }
+        tryEvictAsync(false);
     }
 
     /**
@@ -949,7 +947,11 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
                     try {
                         CacheDataRow row = it0.next();
 
-                        GridDhtCacheEntry cached = 
(GridDhtCacheEntry)cctx.cache().entryEx(row.key());
+                        GridDhtCacheEntry cached = (GridDhtCacheEntry) 
getEntry(row.key());
+
+                        if (cached == null || cached.obsolete())
+                            cached = (GridDhtCacheEntry) 
putEntryIfObsoleteOrAbsent(
+                                cctx.affinity().affinityTopologyVersion(), 
row.key(), null, true, false);
 
                         if (cached.clearInternal(clearVer, extras)) {
                             if (rec) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index b57ca57..7b5dee3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1262,17 +1262,24 @@ public final class GridDhtLockFuture extends 
GridCompoundIdentityFuture<Boolean>
                     try {
                         GridCacheEntryEx entry = cache0.entryEx(info.key(), 
topVer);
 
-                        if (entry.initialValue(info.value(),
-                            info.version(),
-                            info.ttl(),
-                            info.expireTime(),
-                            true, topVer,
-                            replicate ? DR_PRELOAD : DR_NONE,
-                            false)) {
-                            if (rec && !entry.isInternal())
-                                cctx.events().addEvent(entry.partition(), 
entry.key(), cctx.localNodeId(),
-                                    (IgniteUuid)null, null, 
EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
-                                    false, null, null, null, false);
+                        cctx.shared().database().checkpointReadLock();
+
+                        try {
+                            if (entry.initialValue(info.value(),
+                                info.version(),
+                                info.ttl(),
+                                info.expireTime(),
+                                true, topVer,
+                                replicate ? DR_PRELOAD : DR_NONE,
+                                false)) {
+                                if (rec && !entry.isInternal())
+                                    cctx.events().addEvent(entry.partition(), 
entry.key(), cctx.localNodeId(),
+                                        (IgniteUuid)null, null, 
EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
+                                        false, null, null, null, false);
+                            }
+                        }
+                        finally {
+                            cctx.shared().database().checkpointReadUnlock();
                         }
                     }
                     catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index ac3e2c8..37c3af9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -128,6 +128,18 @@ public interface GridDhtPartitionTopology {
         throws GridDhtInvalidPartitionException;
 
     /**
+     * @param topVer Topology version at the time of creation.
+     * @param p Partition ID.
+     * @param create If {@code true}, then partition will be created if it's 
not there.
+     * @return Local partition.
+     * @throws GridDhtInvalidPartitionException If partition is evicted or 
absent and
+     *      does not belong to this node.
+     */
+    @Nullable public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer, boolean create,
+        boolean showRenting)
+        throws GridDhtInvalidPartitionException;
+
+    /**
      * @param parts Partitions to release (should be reserved before).
      */
     public void releasePartitions(int... parts);
@@ -213,14 +225,15 @@ public interface GridDhtPartitionTopology {
     public void onRemoved(GridDhtCacheEntry e);
 
     /**
-     * @param exchId Exchange ID.
+     * @param exchFut Exchange future.
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} 
otherwise.
      */
-    public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId 
exchId,
+    public GridDhtPartitionMap2 update(@Nullable 
GridDhtPartitionsExchangeFuture exchFut,
         GridDhtPartitionFullMap partMap,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap);
+        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
+        Set<Integer> partsToReload);
 
     /**
      * @param exchId Exchange ID.
@@ -301,6 +314,7 @@ public interface GridDhtPartitionTopology {
      * @param p Partition ID.
      * @param updateSeq If should increment sequence when updated.
      * @param owners Set of new owners.
+     * @return Set of node IDs that should reload partitions.
      */
-    public void setOwners(int p, Set<UUID> owners, boolean updateSeq);
+    public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, 
boolean updateSeq);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 66e5be3..553aa2a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -32,12 +32,10 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -59,7 +57,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
@@ -70,8 +67,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Partition topology.
  */
-@GridToStringExclude
-class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
+@GridToStringExclude class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -189,93 +185,6 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : 
map.toString();
     }
 
-    /**
-     * Waits for renting partitions.
-     *
-     * @return {@code True} if mapping was changed.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean waitForRent() throws IgniteCheckedException {
-        final long longOpDumpTimeout =
-            
IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT,
 60_000);
-
-        int dumpCnt = 0;
-
-        GridDhtLocalPartition part;
-
-        for (int i = 0; i < locParts.length(); i++) {
-            part = locParts.get(i);
-
-            if (part == null)
-                continue;
-
-            GridDhtPartitionState state = part.state();
-
-            if (state == RENTING || state == EVICTED) {
-                if (log.isDebugEnabled())
-                    log.debug("Waiting for renting partition: " + part);
-
-                part.tryEvictAsync(false);
-
-                // Wait for partition to empty out.
-                if (longOpDumpTimeout > 0) {
-                    while (true) {
-                        try {
-                            part.rent(true).get(longOpDumpTimeout);
-
-                            break;
-                        }
-                        catch (IgniteFutureTimeoutCheckedException e) {
-                            if (dumpCnt++ < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
-                                U.warn(log, "Failed to wait for partition 
eviction [" +
-                                    "topVer=" + topVer +
-                                    ", cache=" + cctx.name() +
-                                    ", part=" + part.id() +
-                                    ", partState=" + part.state() +
-                                    ", size=" + part.size() +
-                                    ", reservations=" + part.reservations() +
-                                    ", grpReservations=" + 
part.groupReserved() +
-                                    ", node=" + cctx.localNodeId() + "]");
-
-                                if 
(IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, 
false))
-                                    U.dumpThreads(log);
-                            }
-                        }
-                    }
-                }
-                else
-                    part.rent(true).get();
-
-                if (log.isDebugEnabled())
-                    log.debug("Finished waiting for renting partition: " + 
part);
-            }
-        }
-
-        // Remove evicted partition.
-        lock.writeLock().lock();
-
-        try {
-            boolean changed = false;
-
-            for (int i = 0; i < locParts.length(); i++) {
-                part = locParts.get(i);
-
-                if (part == null)
-                    continue;
-
-                if (part.state() == EVICTED) {
-                    locParts.set(i, null);
-                    changed = true;
-                }
-            }
-
-            return changed;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
     @Override public void readLock() {
@@ -422,7 +331,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             // If preloader is disabled, then we simply clear out
             // the partitions this node is not responsible for.
             for (int p = 0; p < num; p++) {
-                GridDhtLocalPartition locPart = localPartition(p, topVer, 
false, false);
+                GridDhtLocalPartition locPart = localPartition0(p, topVer, 
false, true, false);
 
                 boolean belongs = localNode(p, aff);
 
@@ -498,9 +407,6 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             && !cctx.kernalContext().clientNode()
         );
 
-        // Wait for rent outside of checkpoint lock.
-        waitForRent();
-
         ClusterNode loc = cctx.localNode();
 
         cctx.shared().database().checkpointReadLock();
@@ -580,16 +486,13 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         finally {
             cctx.shared().database().checkpointReadUnlock();
         }
-
-        // Wait for evictions.
-        waitForRent();
     }
 
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) throws IgniteCheckedException {
         treatAllPartAsLoc = false;
 
-        boolean changed = waitForRent();
+        boolean changed = false;
 
         int num = cctx.affinity().partitions();
 
@@ -616,7 +519,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             for (int p = 0; p < num; p++) {
-                GridDhtLocalPartition locPart = localPartition(p, topVer, 
false, false);
+                GridDhtLocalPartition locPart = localPartition0(p, topVer, 
false, false, false);
 
                 if (cctx.affinity().partitionLocalNode(p, topVer)) {
                     // This partition will be created during next topology 
event,
@@ -697,7 +600,13 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     @Nullable @Override public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer,
         boolean create)
         throws GridDhtInvalidPartitionException {
-        return localPartition(p, topVer, create, true);
+        return localPartition0(p, topVer, create, false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridDhtLocalPartition localPartition(int p, 
AffinityTopologyVersion topVer,
+        boolean create, boolean showRenting) throws 
GridDhtInvalidPartitionException {
+        return localPartition0(p, topVer, create, showRenting, true);
     }
 
     /**
@@ -734,9 +643,10 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @return Local partition.
      */
     @SuppressWarnings("TooBroadScope")
-    private GridDhtLocalPartition localPartition(int p,
+    private GridDhtLocalPartition localPartition0(int p,
         AffinityTopologyVersion topVer,
         boolean create,
+        boolean showRenting,
         boolean updateSeq) {
         GridDhtLocalPartition loc;
 
@@ -744,7 +654,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
         GridDhtPartitionState state = loc != null ? loc.state() : null;
 
-        if (loc != null && state != EVICTED && (state != RENTING || 
!cctx.allowFastEviction()))
+        if (loc != null && state != EVICTED && (state != RENTING || 
showRenting))
             return loc;
 
         if (!create)
@@ -769,8 +679,9 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         "(often may be caused by inconsistent 'key.hashCode()' 
implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" 
+ this.topVer + ']');
             }
-            else if (loc != null && state == RENTING && 
cctx.allowFastEviction())
-                throw new GridDhtInvalidPartitionException(p, "Adding entry to 
partition that is concurrently evicted.");
+            else if (loc != null && state == RENTING && !showRenting)
+                throw new GridDhtInvalidPartitionException(p, "Adding entry to 
partition that is concurrently evicted " +
+                    "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
 
             if (loc == null) {
                 if (!treatAllPartAsLoc && !belongs)
@@ -1062,10 +973,13 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap2 update(
-        @Nullable GridDhtPartitionExchangeId exchId,
+        @Nullable GridDhtPartitionsExchangeFuture exchFut,
         GridDhtPartitionFullMap partMap,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap
+        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
+        Set<Integer> partsToReload
     ) {
+        GridDhtPartitionExchangeId exchId = exchFut != null ? 
exchFut.exchangeId() : null;
+
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", 
parts=" + fullMapString() + ']');
 
@@ -1117,8 +1031,6 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 return null;
             }
 
-            long updateSeq = this.updateSeq.incrementAndGet();
-
             if (exchId != null)
                 lastExchangeId = exchId;
 
@@ -1186,11 +1098,32 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     int p = e.getKey();
                     GridDhtPartitionState state = e.getValue();
 
-                   if (state == MOVING) {
+                    if (state == OWNING) {
                         GridDhtLocalPartition locPart = locParts.get(p);
 
                         assert locPart != null;
 
+                        if (cntrMap != null) {
+                            T2<Long, Long> cntr = cntrMap.get(p);
+
+                            if (cntr != null && cntr.get2() > 
locPart.updateCounter())
+                                locPart.updateCounter(cntr.get2());
+                        }
+
+                        if (locPart.state() == MOVING) {
+                            boolean success = locPart.own();
+
+                            assert success : locPart;
+
+                            changed |= success;
+                        }
+                    }
+                    else if (state == MOVING) {
+                        GridDhtLocalPartition locPart = locParts.get(p);
+
+                        if (locPart == null || locPart.state() == EVICTED)
+                            locPart = createPartition(p);
+
                         if (locPart.state() == OWNING) {
                             locPart.moving();
 
@@ -1204,9 +1137,30 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 locPart.updateCounter(cntr.get2());
                         }
                     }
+                    else if (state == RENTING && partsToReload.contains(p)) {
+                        GridDhtLocalPartition locPart = locParts.get(p);
+
+                        if (locPart == null || locPart.state() == EVICTED) {
+                            createPartition(p);
+
+                            changed = true;
+                        }
+                        else if (locPart.state() == OWNING || locPart.state() 
== MOVING) {
+                            locPart.reload(true);
+
+                            locPart.rent(false);
+
+                            changed = true;
+                        }
+                        else {
+                            locPart.reload(true);
+                        }
+                    }
                 }
             }
 
+            long updateSeq = this.updateSeq.incrementAndGet();
+
             if (!affVer.equals(AffinityTopologyVersion.NONE) && 
affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = 
cctx.affinity().assignments(topVer);
 
@@ -1536,24 +1490,42 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void setOwners(int p, Set<UUID> owners, boolean 
updateSeq) {
+    @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean 
haveHistory, boolean updateSeq) {
+        Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new 
HashSet<UUID>();
+
         lock.writeLock().lock();
 
         try {
-
             GridDhtLocalPartition locPart = locParts.get(p);
 
             if (locPart != null) {
-                if (locPart.state() == OWNING && 
!owners.contains(cctx.localNodeId()))
-                    locPart.moving();
+                if (locPart.state() == OWNING && 
!owners.contains(cctx.localNodeId())) {
+                    if (haveHistory)
+                        locPart.moving();
+                    else {
+                        locPart.rent(false);
+
+                        locPart.reload(true);
+
+                        result.add(cctx.localNodeId());
+                    }
+
+                }
             }
 
             for (Map.Entry<UUID, GridDhtPartitionMap2> e : 
node2part.entrySet()) {
                 if (!e.getValue().containsKey(p))
                     continue;
 
-                if (e.getValue().get(p) == OWNING && 
!owners.contains(e.getKey()))
-                    e.getValue().put(p, MOVING);
+                if (e.getValue().get(p) == OWNING && 
!owners.contains(e.getKey())) {
+                    if (haveHistory)
+                        e.getValue().put(p, MOVING);
+                    else {
+                        e.getValue().put(p, RENTING);
+
+                        result.add(e.getKey());
+                    }
+                }
             }
 
             if (updateSeq)
@@ -1562,6 +1534,8 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         finally {
             lock.writeLock().unlock();
         }
+
+        return result;
     }
 
     /**
@@ -1629,6 +1603,8 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                     // If all affinity nodes are owners, then evict partition 
from local node.
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
+                        part.reload(false);
+
                         part.rent(false);
 
                         updateSeq = updateLocal(part.id(), part.state(), 
updateSeq);
@@ -1654,6 +1630,8 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 ClusterNode n = sorted.get(i);
 
                                 if (locId.equals(n.id())) {
+                                    part.reload(false);
+
                                     part.rent(false);
 
                                     updateSeq = updateLocal(part.id(), 
part.state(), updateSeq);
@@ -1833,6 +1811,9 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             long seq = updateSeq ? this.updateSeq.incrementAndGet() : 
this.updateSeq.get();
 
+            if (part.reload())
+                part = createPartition(part.id());
+
             updateLocal(part.id(), part.state(), seq);
 
             consistencyCheck();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index debf8b6..5169f27 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -147,7 +147,7 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
             storeEnabled,
             onePhaseCommit,
             txSize,
-            subjId, 
+            subjId,
             taskNameHash
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index c9dcaaf..377c274 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -50,7 +50,11 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
     @GridDirectCollection(int.class)
     private Collection<Integer> parts;
 
-    /** Partition. */
+    /** Partitions that must be restored from history. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> historicalParts;
+
+    /** Partition counters. */
     @GridDirectMap(keyType = int.class, valueType = long.class)
     private Map<Integer, Long> partsCntrs;
 
@@ -85,7 +89,8 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
      * @param cp Message to copy from.
      * @param parts Partitions.
      */
-    GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, 
Collection<Integer> parts, Map<Integer, Long> partsCntrs) {
+    GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, 
Collection<Integer> parts,
+        Map<Integer, Long> partsCntrs) {
         cacheId = cp.cacheId;
         updateSeq = cp.updateSeq;
         topic = cp.topic;
@@ -96,6 +101,9 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
         // Create a copy of passed in collection since it can be modified when 
this message is being sent.
         this.parts = new HashSet<>(parts);
         this.partsCntrs = partsCntrs;
+
+        if (cp.historicalParts != null)
+            historicalParts = new HashSet<>(cp.historicalParts);
     }
 
     /**
@@ -108,13 +116,19 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
     /**
      * @param p Partition.
      */
-    void addPartition(int p) {
+    void addPartition(int p, boolean historical) {
         if (parts == null)
             parts = new HashSet<>();
 
         parts.add(p);
-    }
 
+        if (historical) {
+            if (historicalParts == null)
+                historicalParts = new HashSet<>();
+
+            historicalParts.add(p);
+        }
+    }
 
     /**
      * @return Partition.
@@ -124,6 +138,17 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
     }
 
     /**
+     * @param p Partition to check.
+     * @return {@code True} if historical.
+     */
+    boolean isHistorical(int p) {
+        if (historicalParts == null)
+            return false;
+
+        return historicalParts.contains(p);
+    }
+
+    /**
      * @param updateSeq Update sequence.
      */
     void updateSequence(long updateSeq) {
@@ -232,42 +257,48 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeCollection("parts", parts, 
MessageCollectionItemType.INT))
+                if (!writer.writeCollection("historicalParts", 
historicalParts, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("partsCntrs", partsCntrs, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+                if (!writer.writeCollection("parts", parts, 
MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeLong("timeout", timeout))
+                if (!writer.writeMap("partsCntrs", partsCntrs, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeLong("timeout", timeout))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("topicBytes", topicBytes))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeLong("updateSeq", updateSeq))
+                if (!writer.writeByteArray("topicBytes", topicBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
+                if (!writer.writeLong("updateSeq", updateSeq))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
                 if (!writer.writeInt("workerId", workerId))
                     return false;
 
@@ -290,7 +321,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
-                parts = reader.readCollection("parts", 
MessageCollectionItemType.INT);
+                historicalParts = reader.readCollection("historicalParts", 
MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -298,7 +329,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                partsCntrs = reader.readMap("partsCntrs", 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+                parts = reader.readCollection("parts", 
MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -306,7 +337,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
                 reader.incrementState();
 
             case 5:
-                timeout = reader.readLong("timeout");
+                partsCntrs = reader.readMap("partsCntrs", 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -314,7 +345,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
                 reader.incrementState();
 
             case 6:
-                topVer = reader.readMessage("topVer");
+                timeout = reader.readLong("timeout");
 
                 if (!reader.isLastRead())
                     return false;
@@ -322,7 +353,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
                 reader.incrementState();
 
             case 7:
-                topicBytes = reader.readByteArray("topicBytes");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -330,7 +361,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
                 reader.incrementState();
 
             case 8:
-                updateSeq = reader.readLong("updateSeq");
+                topicBytes = reader.readByteArray("topicBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -338,6 +369,14 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
                 reader.incrementState();
 
             case 9:
+                updateSeq = reader.readLong("updateSeq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
                 workerId = reader.readInt("workerId");
 
                 if (!reader.isLastRead())
@@ -357,7 +396,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 11;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b80ad04..c04a7b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -278,10 +278,16 @@ class GridDhtPartitionSupplier {
                         IgniteRebalanceIterator iter;
 
                         if (sctx == null || sctx.entryIt == null) {
-                            iter = cctx.offheap().rebalanceIterator(part, 
d.topologyVersion(), d.partitionCounter(part));
+                            iter = cctx.offheap().rebalanceIterator(part, 
d.topologyVersion(),
+                                d.isHistorical(part) ? 
d.partitionCounter(part) : null);
+
+                            if (!iter.historical()) {
+                                assert 
!cctx.shared().database().persistenceEnabled() || !d.isHistorical(part);
 
-                            if (!iter.historical())
                                 s.clean(part);
+                            }
+                            else
+                                assert 
cctx.shared().database().persistenceEnabled() && d.isHistorical(part);
                         }
                         else
                             iter = (IgniteRebalanceIterator)sctx.entryIt;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index f4c624d..5cfd6e5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -235,7 +235,6 @@ public class GridDhtPartitionSupplyMessageV2 extends 
GridCacheMessage implements
     void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) 
throws IgniteCheckedException {
         assert info != null;
         assert (info.key() != null || info.keyBytes() != null);
-        assert info.value() != null;
 
         // Need to call this method to initialize info properly.
         marshalInfo(info, ctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4c179e6..6cba4f4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -218,11 +218,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private boolean exchangeOnChangeGlobalState;
 
     /** */
-    private final ConcurrentMap<UUID, GridDhtPartitionsAbstractMessage> msgs = 
new ConcurrentHashMap8<>();
+    private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new 
ConcurrentHashMap8<>();
+
+    /** */
+    private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = 
new IgniteDhtPartitionHistorySuppliersMap();
 
     /** Forced Rebalance future. */
     private GridFutureAdapter<Boolean> forcedRebFut;
 
+    /** */
+    private volatile Map<Integer, Map<Integer, Long>> partHistReserved;
+
+    /** */
+    private volatile IgniteDhtPartitionsToReloadMap partsToReload = new 
IgniteDhtPartitionsToReloadMap();
+
     /**
      * Dummy future created to trigger reassignments if partition
      * topology changed while preloading.
@@ -368,6 +377,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param partId Partition ID.
+     * @return ID of history supplier node or null if it doesn't exist.
+     */
+    @Nullable public UUID partitionHistorySupplier(int cacheId, int partId) {
+        return partHistSuppliers.getSupplier(cacheId, partId);
+    }
+
+    /**
      * @param cacheId Cache ID to check.
      * @param topVer Topology version.
      * @return {@code True} if cache was added during this exchange.
@@ -509,7 +527,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 DiscoveryCustomMessage msg = 
((DiscoveryCustomEvent)discoEvt).customMessage();
 
-                if (msg instanceof DynamicCacheChangeBatch){
+                if (msg instanceof DynamicCacheChangeBatch) {
                     assert !F.isEmpty(reqs);
 
                     exchange = onCacheChangeRequest(crdNode);
@@ -678,7 +696,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                     
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                 if (updateTop && clientTop != null)
-                    cacheCtx.topology().update(exchId, 
clientTop.partitionMap(true), clientTop.updateCounters(false));
+                    cacheCtx.topology().update(this, 
clientTop.partitionMap(true), clientTop.updateCounters(false), 
Collections.<Integer>emptySet());
             }
 
             top.updateTopologyVersion(exchId, this, updSeq, 
stopping(cacheCtx.cacheId()));
@@ -798,9 +816,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                     if (updateTop) {
                         for (GridClientPartitionTopology top : 
cctx.exchange().clientTopologies()) {
                             if (top.cacheId() == cacheCtx.cacheId()) {
-                                cacheCtx.topology().update(exchId,
+                                cacheCtx.topology().update(this,
                                     top.partitionMap(true),
-                                    top.updateCounters(false));
+                                    top.updateCounters(false),
+                                    Collections.<Integer>emptySet());
 
                                 break;
                             }
@@ -845,11 +864,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             cacheCtx.preloader().onTopologyChanged(this);
         }
 
+        cctx.database().releaseHistoryForPreloading();
+
+        // To correctly rebalance when persistence is enabled, it is necessary 
to reserve history within exchange.
+        partHistReserved = cctx.database().reserveHistoryForExchange();
+
         waitPartitionRelease();
 
         boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || 
affChangeMsg != null;
 
-        //todo check
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
                 continue;
@@ -1107,6 +1130,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         GridDhtPartitionsSingleMessage m = 
cctx.exchange().createPartitionsSingleMessage(
             node, exchangeId(), clientOnlyExchange, true);
 
+        Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
+
+        if (partHistReserved0 != null)
+            m.partitionHistoryCounters(partHistReserved0);
+
         if (exchangeOnChangeGlobalState && changeGlobalStateE != null)
             m.setException(changeGlobalStateE);
 
@@ -1133,6 +1161,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             nodes,
             exchangeId(),
             last != null ? last : cctx.versions().last(),
+            partHistSuppliers,
+            partsToReload,
             compress);
 
         if (exchangeOnChangeGlobalState && 
!F.isEmpty(changeGlobalStateExceptions))
@@ -1252,6 +1282,22 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
 
+        Map<T2<Integer, Integer>, Long> localReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
+
+        if (localReserved != null) {
+            for (Map.Entry<T2<Integer, Integer>, Long> e : 
localReserved.entrySet()) {
+                boolean success = cctx.database().reserveHistoryForPreloading(
+                    e.getKey().get1(), e.getKey().get2(), e.getValue());
+
+                if (!success) {
+                    // TODO: how to handle?
+                    err = new IgniteCheckedException("Could not reserve 
history");
+                }
+            }
+        }
+
+        cctx.database().releaseHistoryForExchange();
+
         if (super.onDone(res, err) && realExchange) {
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + 
cctx.localNodeId() + ", exchange= " + this +
@@ -1523,8 +1569,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      */
     private void assignPartitionStates(GridDhtPartitionTopology top) {
         Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
+        Map<Integer, Long> minCntrs = new HashMap<>();
 
-        for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : 
msgs.entrySet()) {
+        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
msgs.entrySet()) {
             assert e.getValue().partitionUpdateCounters(top.cacheId()) != null;
 
             for (Map.Entry<Integer, T2<Long, Long>> e0 : 
e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) {
@@ -1534,12 +1581,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                 GridDhtPartitionState state = top.partitionState(uuid, p);
 
-                if (state != GridDhtPartitionState.OWNING)
+                if (state != GridDhtPartitionState.OWNING && state != 
GridDhtPartitionState.MOVING)
                     continue;
 
-                Long cntr = e0.getValue().get1();
+                Long cntr = state == GridDhtPartitionState.MOVING ? 
e0.getValue().get1() : e0.getValue().get2();
 
                 if (cntr == null)
+                    cntr = 0L;
+
+                Long minCntr = minCntrs.get(p);
+
+                if (minCntr == null || minCntr > cntr)
+                    minCntrs.put(p, cntr);
+
+                if (state != GridDhtPartitionState.OWNING)
                     continue;
 
                 CounterWithNodes maxCntr = maxCntrs.get(p);
@@ -1555,29 +1610,86 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
             GridDhtPartitionState state = 
top.partitionState(cctx.localNodeId(), part.id());
 
+            if (state != GridDhtPartitionState.OWNING && state != 
GridDhtPartitionState.MOVING)
+                continue;
+
+            long cntr = state == GridDhtPartitionState.MOVING ? 
part.initialUpdateCounter() : part.updateCounter();
+
+            Long minCntr = minCntrs.get(part.id());
+
+            if (minCntr == null || minCntr > cntr)
+                minCntrs.put(part.id(), cntr);
+
             if (state != GridDhtPartitionState.OWNING)
                 continue;
 
             CounterWithNodes maxCntr = maxCntrs.get(part.id());
 
-            if (maxCntr == null || part.initialUpdateCounter() > maxCntr.cnt)
-                maxCntrs.put(part.id(), new 
CounterWithNodes(part.updateCounter(), cctx.localNodeId()));
-            else if (part.initialUpdateCounter() == maxCntr.cnt)
+            if (maxCntr == null || cntr > maxCntr.cnt)
+                maxCntrs.put(part.id(), new CounterWithNodes(cntr, 
cctx.localNodeId()));
+            else if (cntr == maxCntr.cnt)
                 maxCntr.nodes.add(cctx.localNodeId());
         }
 
         int entryLeft = maxCntrs.size();
 
+        Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
+
+        Map<Integer, Long> localReserved = partHistReserved0 != null ? 
partHistReserved0.get(top.cacheId()) : null;
+
+        Set<Integer> haveHistory = new HashSet<>();
+
+        for (Map.Entry<Integer, Long> e : minCntrs.entrySet()) {
+            int p = e.getKey();
+            long minCntr = e.getValue();
+
+            CounterWithNodes maxCntrObj = maxCntrs.get(p);
+
+            long maxCntr = maxCntrObj != null ? maxCntrObj.cnt : 0;
+
+            // If minimal counter is zero, do clean preloading.
+            if (minCntr == 0 || minCntr == maxCntr)
+                continue;
+
+            if (localReserved != null) {
+                Long localCntr = localReserved.get(p);
+
+                if (localCntr != null && localCntr <= minCntr &&
+                    maxCntrObj.nodes.contains(cctx.localNodeId())) {
+                    partHistSuppliers.put(cctx.localNodeId(), top.cacheId(), 
p, minCntr);
+
+                    haveHistory.add(p);
+
+                    continue;
+                }
+            }
+
+            for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : 
msgs.entrySet()) {
+                Long histCntr = 
e0.getValue().partitionHistoryCounters(top.cacheId()).get(p);
+
+                if (histCntr != null && histCntr <= minCntr && 
maxCntrObj.nodes.contains(e0.getKey())) {
+                    partHistSuppliers.put(e0.getKey(), top.cacheId(), p, 
minCntr);
+
+                    haveHistory.add(p);
+
+                    break;
+                }
+            }
+        }
+
         for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
             int p = e.getKey();
             long maxCntr = e.getValue().cnt;
 
-            entryLeft --;
+            entryLeft--;
 
             if (entryLeft != 0 && maxCntr == 0)
                 continue;
 
-            top.setOwners(p, e.getValue().nodes, entryLeft == 0);
+            Set<UUID> nodesToReload = top.setOwners(p, e.getValue().nodes, 
haveHistory.contains(p), entryLeft == 0);
+
+            for (UUID nodeId : nodesToReload)
+                partsToReload.put(nodeId, top.cacheId(), p);
         }
     }
 
@@ -1618,6 +1730,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         try {
             assert crd.isLocal();
 
+            assert partHistSuppliers.isEmpty();
+
             if 
(!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (!cacheCtx.isLocal())
@@ -1818,6 +1932,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) {
         cctx.versions().onExchange(msg.lastVersion().order());
 
+        assert partHistSuppliers.isEmpty();
+
+        partHistSuppliers.putAll(msg.partitionHistorySuppliers());
+
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();
 
@@ -1826,12 +1944,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
             if (cacheCtx != null)
-                cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
+                cacheCtx.topology().update(this, entry.getValue(), cntrMap,
+                    msg.partsToReload(cctx.localNodeId(), cacheId));
             else {
                 ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
-                    cctx.exchange().clientTopology(cacheId, 
this).update(exchId, entry.getValue(), cntrMap);
+                    cctx.exchange().clientTopology(cacheId, this).update(this, 
entry.getValue(), cntrMap, Collections.<Integer>emptySet());
             }
         }
     }
@@ -2014,7 +2133,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                         }
 
                         if (crd0.isLocal()) {
-                            if (exchangeOnChangeGlobalState && 
changeGlobalStateE !=null)
+                            if (exchangeOnChangeGlobalState && 
changeGlobalStateE != null)
                                 changeGlobalStateExceptions.put(crd0.id(), 
changeGlobalStateE);
 
                             if (allReceived) {

Reply via email to