http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 7194b24..6d2f526 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -18,22 +18,24 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -44,7 +46,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
- * Thread pool for supplying partitions to demanding nodes.
+ * Class for supplying partitions to demanding nodes.
  */
 class GridDhtPartitionSupplier {
     /** */
@@ -56,13 +58,10 @@ class GridDhtPartitionSupplier {
     /** */
     private GridDhtPartitionTopology top;
 
-    /** */
-    private final boolean depEnabled;
-
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
-    /** Supply context map. T2: nodeId, idx, topVer. */
+    /** Supply context map. T3: nodeId, topicId, topVer. */
     private final Map<T3<UUID, Integer, AffinityTopologyVersion>, 
SupplyContext> scMap = new HashMap<>();
 
     /**
@@ -76,12 +75,10 @@ class GridDhtPartitionSupplier {
         log = grp.shared().logger(getClass());
 
         top = grp.topology();
-
-        depEnabled = grp.shared().gridDeploy().enabled();
     }
 
     /**
-     *
+     * Clears all supply contexts in case of node stopping.
      */
     void stop() {
         synchronized (scMap) {
@@ -98,7 +95,7 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * Clear context.
+     * Clears supply context.
      *
      * @param sc Supply context.
      * @param log Logger.
@@ -107,29 +104,21 @@ class GridDhtPartitionSupplier {
         final SupplyContext sc,
         final IgniteLogger log) {
         if (sc != null) {
-            final Iterator it = sc.entryIt;
+            final IgniteRebalanceIterator it = sc.iterator;
 
-            if (it != null && it instanceof GridCloseableIterator && 
!((GridCloseableIterator)it).isClosed()) {
+            if (it != null && !it.isClosed()) {
                 try {
-                    ((GridCloseableIterator)it).close();
+                    it.close();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Iterator close failed.", e);
                 }
             }
-
-            final GridDhtLocalPartition loc = sc.loc;
-
-            if (loc != null) {
-                assert loc.reservations() > 0;
-
-                loc.release();
-            }
         }
     }
 
     /**
-     * Handles new topology.
+     * Handles new topology version and clears supply context map of outdated 
contexts.
      *
      * @param topVer Topology version.
      */
@@ -154,7 +143,7 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * Sets preload predicate for supply pool.
+     * Sets preload predicate for this supplier.
      *
      * @param preloadPred Preload predicate.
      */
@@ -163,47 +152,58 @@ class GridDhtPartitionSupplier {
     }
 
     /**
+     * For each demand message method lookups (or creates new) supply context 
and starts to iterate entries across requested partitions.
+     * Each entry in iterator is placed to prepared supply message.
+     *
+     * If supply message size in bytes becomes greater than {@link 
CacheConfiguration#getRebalanceBatchSize()}
+     * method sends this message to demand node and saves partial state of 
iterated entries to supply context,
+     * then restores the context again after new demand message with the same 
context id is arrived.
+     *
+     * @param topicId Id of the topic is used for the supply-demand 
communication.
+     * @param nodeId Id of the node which sent the demand message.
      * @param d Demand message.
-     * @param idx Index.
-     * @param id Node uuid.
      */
     @SuppressWarnings("unchecked")
-    public void handleDemandMessage(int idx, UUID id, 
GridDhtPartitionDemandMessage d) {
+    public void handleDemandMessage(int topicId, UUID nodeId, 
GridDhtPartitionDemandMessage d) {
         assert d != null;
-        assert id != null;
+        assert nodeId != null;
 
-        AffinityTopologyVersion cutTop = grp.affinity().lastVersion();
+        AffinityTopologyVersion curTop = grp.affinity().lastVersion();
         AffinityTopologyVersion demTop = d.topologyVersion();
 
-        T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, 
demTop);
+        if (curTop.compareTo(demTop) > 0) {
+            if (log.isDebugEnabled())
+                log.debug("Demand request outdated [currentTopVer=" + curTop
+                        + ", demandTopVer=" + demTop
+                        + ", from=" + nodeId
+                        + ", topicId=" + topicId + "]");
 
-        if (d.updateSequence() == -1) { //Demand node requested context 
cleanup.
+            return;
+        }
+
+        T3<UUID, Integer, AffinityTopologyVersion> contextId = new 
T3<>(nodeId, topicId, demTop);
+
+        if (d.rebalanceId() < 0) { // Demand node requested context cleanup.
             synchronized (scMap) {
-                clearContext(scMap.remove(scId), log);
+                SupplyContext sctx = scMap.get(contextId);
+
+                if (sctx != null && sctx.rebalanceId == -d.rebalanceId()) {
+                    clearContext(scMap.remove(contextId), log);
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Stale context cleanup message " + d + ", 
supplyContext=" + sctx);
+                }
 
                 return;
             }
         }
 
-        if (cutTop.compareTo(demTop) > 0) {
-            if (log.isDebugEnabled())
-                log.debug("Demand request cancelled [current=" + cutTop + ", 
demanded=" + demTop +
-                    ", from=" + id + ", idx=" + idx + "]");
-
-            return;
-        }
-
         if (log.isDebugEnabled())
-            log.debug("Demand request accepted [current=" + cutTop + ", 
demanded=" + demTop +
-                ", from=" + id + ", idx=" + idx + "]");
+            log.debug("Demand request accepted [current=" + curTop + ", 
demanded=" + demTop +
+                ", from=" + nodeId + ", topicId=" + topicId + "]");
 
-        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
-            d.updateSequence(),
-            grp.groupId(),
-            d.topologyVersion(),
-            grp.deploymentEnabled());
-
-        ClusterNode node = grp.shared().discovery().node(id);
+        ClusterNode node = grp.shared().discovery().node(nodeId);
 
         if (node == null)
             return; // Context will be cleaned at topology change.
@@ -212,230 +212,198 @@ class GridDhtPartitionSupplier {
             SupplyContext sctx;
 
             synchronized (scMap) {
-                sctx = scMap.remove(scId);
+                sctx = scMap.remove(contextId);
 
-                assert sctx == null || d.updateSequence() == sctx.updateSeq;
+                if (sctx != null && d.rebalanceId() < sctx.rebalanceId) {
+                    // Stale message, return context back and return.
+                    scMap.put(contextId, sctx);
+                    return;
+                }
             }
 
-            // Initial demand request should contain partitions list.
-            if (sctx == null && d.partitions() == null)
+            // Demand request should not contain empty partitions if no supply 
context is associated with it.
+            if (sctx == null && (d.partitions() == null || 
d.partitions().isEmpty()))
                 return;
 
-            assert !(sctx != null && d.partitions() != null);
+            assert !(sctx != null && !d.partitions().isEmpty());
 
-            long bCnt = 0;
-
-            SupplyContextPhase phase = SupplyContextPhase.NEW;
-
-            boolean newReq = true;
+            long batchesCnt = 0;
 
             long maxBatchesCnt = 
grp.config().getRebalanceBatchesPrefetchCount();
 
             if (sctx != null) {
-                phase = sctx.phase;
-
                 maxBatchesCnt = 1;
             }
             else {
                 if (log.isDebugEnabled())
                     log.debug("Starting supplying rebalancing [cache=" + 
grp.cacheOrGroupName() +
                         ", fromNode=" + node.id() + ", partitionsCount=" + 
d.partitions().size() +
-                        ", topology=" + d.topologyVersion() + ", updateSeq=" + 
d.updateSequence() +
-                        ", idx=" + idx + "]");
+                        ", topology=" + demTop + ", rebalanceId=" + 
d.rebalanceId() +
+                        ", topicId=" + topicId + "]");
             }
 
-            Iterator<Integer> partIt = sctx != null ? sctx.partIt : 
d.partitions().iterator();
+            GridDhtPartitionSupplyMessage s = new 
GridDhtPartitionSupplyMessage(
+                    d.rebalanceId(),
+                    grp.groupId(),
+                    d.topologyVersion(),
+                    grp.deploymentEnabled());
+
+            IgniteRebalanceIterator iter;
+
+            Set<Integer> remainingParts;
+
+            if (sctx == null || sctx.iterator == null) {
+                iter = grp.offheap().rebalanceIterator(d.partitions(), 
d.topologyVersion());
+
+                remainingParts = new HashSet<>(d.partitions().fullSet());
+
+                CachePartitionPartialCountersMap histMap = 
d.partitions().historicalMap();
+
+                for (int i = 0; i < histMap.size(); i++) {
+                    int p = histMap.partitionAt(i);
+
+                    remainingParts.add(p);
+                }
+
+                for (Integer part : d.partitions().fullSet()) {
+                    if (iter.isPartitionMissing(part))
+                        continue;
 
-            if (sctx == null) {
-                for (Integer part : d.partitions()) {
                     GridDhtLocalPartition loc = top.localPartition(part, 
d.topologyVersion(), false);
 
-                    if (loc == null || loc.state() != OWNING)
+                    assert loc != null && loc.state() == 
GridDhtPartitionState.OWNING;
+
+                    
s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part));
+                }
+
+                for (int i = 0; i < histMap.size(); i++) {
+                    int p = histMap.partitionAt(i);
+
+                    if (iter.isPartitionMissing(p))
                         continue;
 
-                    if (grp.sharedGroup()) {
-                        for (int cacheId : grp.cacheIds())
-                            s.addKeysForCache(cacheId, 
grp.offheap().cacheEntriesCount(cacheId, part));
-                    }
-                    else
-                        
s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part));
+                    s.addEstimatedKeysCount(histMap.updateCounterAt(i) - 
histMap.initialUpdateCounterAt(i));
                 }
             }
+            else {
+                iter = sctx.iterator;
 
-            while ((sctx != null && newReq) || partIt.hasNext()) {
-                int part = sctx != null && newReq ? sctx.part : partIt.next();
+                remainingParts = sctx.remainingParts;
+            }
 
-                newReq = false;
+            final int messageMaxSize = grp.config().getRebalanceBatchSize();
 
-                GridDhtLocalPartition loc;
+            while (iter.hasNext()) {
+                if (s.messageSize() >= messageMaxSize) {
+                    if (++batchesCnt >= maxBatchesCnt) {
+                        saveSupplyContext(contextId,
+                            iter,
+                            remainingParts,
+                            d.rebalanceId()
+                        );
 
-                if (sctx != null && sctx.loc != null) {
-                    loc = sctx.loc;
+                        reply(node, d, s, contextId);
 
-                    assert loc.reservations() > 0;
+                        return;
+                    }
+                    else {
+                        if (!reply(node, d, s, contextId))
+                            return;
+
+                        s = new GridDhtPartitionSupplyMessage(d.rebalanceId(),
+                            grp.groupId(),
+                            d.topologyVersion(),
+                            grp.deploymentEnabled());
+                    }
                 }
-                else {
-                    loc = top.localPartition(part, d.topologyVersion(), false);
 
-                    if (loc == null || loc.state() != OWNING || 
!loc.reserve()) {
-                        // Reply with partition of "-1" to let sender know that
-                        // this node is no longer an owner.
-                        s.missed(part);
+                CacheDataRow row = iter.next();
 
-                        if (log.isDebugEnabled())
-                            log.debug("Requested partition is not owned by 
local node [part=" + part +
-                                ", demander=" + id + ']');
+                int part = row.partition();
 
-                        continue;
-                    }
+                GridDhtLocalPartition loc = top.localPartition(part, 
d.topologyVersion(), false);
+
+                assert (loc != null && loc.state() == OWNING && 
loc.reservations() > 0) || iter.isPartitionMissing(part) : loc;
+
+                if (iter.isPartitionMissing(part) && 
remainingParts.contains(part)) {
+                    s.missed(part);
+
+                    remainingParts.remove(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is marked as missing on 
local node [part=" + part +
+                            ", demander=" + nodeId + ']');
+
+                    continue;
                 }
 
-                try {
-                    boolean partMissing = false;
-
-                    if (phase == SupplyContextPhase.NEW)
-                        phase = SupplyContextPhase.OFFHEAP;
-
-                    if (phase == SupplyContextPhase.OFFHEAP) {
-                        IgniteRebalanceIterator iter;
-
-                        if (sctx == null || sctx.entryIt == null) {
-                            iter = grp.offheap().rebalanceIterator(part, 
d.topologyVersion(),
-                                d.isHistorical(part) ? 
d.partitionCounter(part) : null);
-
-                            if (!iter.historical())
-                                assert !grp.persistenceEnabled() || 
!d.isHistorical(part);
-                            else
-                                assert grp.persistenceEnabled() && 
d.isHistorical(part);
-                        }
-                        else
-                            iter = (IgniteRebalanceIterator)sctx.entryIt;
-
-                        while (iter.hasNext()) {
-                            List<ClusterNode> nodes = 
grp.affinity().cachedAffinity(d.topologyVersion()).get(part);
-
-                            if (!nodes.contains(node)) {
-                                // Demander no longer needs this partition,
-                                // so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need 
requested partition " +
-                                        "[part=" + part + ", nodeId=" + id + 
']');
-
-                                partMissing = true;
-
-                                if (sctx != null) {
-                                    sctx = new SupplyContext(
-                                        phase,
-                                        partIt,
-                                        null,
-                                        part,
-                                        loc,
-                                        d.updateSequence());
-                                }
-
-                                break;
-                            }
-
-                            if (s.messageSize() >= 
grp.config().getRebalanceBatchSize()) {
-                                if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId,
-                                        phase,
-                                        partIt,
-                                        part,
-                                        iter,
-                                        loc,
-                                        d.topologyVersion(),
-                                        d.updateSequence());
-
-                                    loc = null;
-
-                                    reply(node, d, s, scId);
-
-                                    return;
-                                }
-                                else {
-                                    if (!reply(node, d, s, scId))
-                                        return;
-
-                                    s = new 
GridDhtPartitionSupplyMessage(d.updateSequence(),
-                                        grp.groupId(),
-                                        d.topologyVersion(),
-                                        grp.deploymentEnabled());
-                                }
-                            }
-
-                            CacheDataRow row = iter.next();
-
-                            GridCacheEntryInfo info = new GridCacheEntryInfo();
-
-                            info.key(row.key());
-                            info.expireTime(row.expireTime());
-                            info.version(row.version());
-                            info.value(row.value());
-                            info.cacheId(row.cacheId());
-
-                            if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry0(part, info, grp.shared(), 
grp.cacheObjectContext());
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated 
to false (will not send " +
-                                        "cache entry): " + info);
-
-                                continue;
-                            }
-
-                            // Need to manually prepare cache message.
-// TODO GG-11141.
-//                                if (depEnabled && !prepared) {
-//                                    ClassLoader ldr = 
swapEntry.keyClassLoaderId() != null ?
-//                                        
cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-//                                        swapEntry.valueClassLoaderId() != 
null ?
-//                                            
cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-//                                            null;
-//
-//                                    if (ldr == null)
-//                                        continue;
-//
-//                                    if (ldr instanceof GridDeploymentInfo) {
-//                                        s.prepare((GridDeploymentInfo)ldr);
-//
-//                                        prepared = true;
-//                                    }
-//                                }
-                        }
-
-                        if (partMissing)
-                            continue;
-                    }
+                if (!remainingParts.contains(part))
+                    continue;
+
+                GridCacheEntryInfo info = new GridCacheEntryInfo();
 
-                    // Mark as last supply message.
+                info.key(row.key());
+                info.expireTime(row.expireTime());
+                info.version(row.version());
+                info.value(row.value());
+                info.cacheId(row.cacheId());
+
+                if (preloadPred == null || preloadPred.apply(info))
+                    s.addEntry0(part, info, grp.shared(), 
grp.cacheObjectContext());
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Rebalance predicate evaluated to false 
(will not send " +
+                            "cache entry): " + info);
+                }
+
+                if (iter.isPartitionDone(part)) {
                     s.last(part, loc.updateCounter());
 
-                    if (!d.isHistorical(part))
-                        s.clean(part);
+                    remainingParts.remove(part);
+                }
+
+                // Need to manually prepare cache message.
+                // TODO GG-11141.
+            }
+
+            Iterator<Integer> remainingIter = remainingParts.iterator();
+
+            while (remainingIter.hasNext()) {
+                int p = remainingIter.next();
+
+                if (iter.isPartitionDone(p)) {
+                    GridDhtLocalPartition loc = top.localPartition(p, 
d.topologyVersion(), false);
+
+                    assert loc != null;
 
-                    phase = SupplyContextPhase.NEW;
+                    s.last(p, loc.updateCounter());
 
-                    sctx = null;
+                    remainingIter.remove();
                 }
-                finally {
-                    if (loc != null)
-                        loc.release();
+                else if (iter.isPartitionMissing(p)) {
+                    s.missed(p);
+
+                    remainingIter.remove();
                 }
             }
 
-            reply(node, d, s, scId);
+            assert remainingParts.isEmpty();
+
+            if (sctx != null)
+                clearContext(sctx, log);
+            else
+                iter.close();
+
+            reply(node, d, s, contextId);
 
             if (log.isDebugEnabled())
                 log.debug("Finished supplying rebalancing [cache=" + 
grp.cacheOrGroupName() +
                     ", fromNode=" + node.id() +
-                    ", topology=" + d.topologyVersion() + ", updateSeq=" + 
d.updateSequence() +
-                    ", idx=" + idx + "]");
+                    ", topology=" + demTop + ", rebalanceId=" + 
d.rebalanceId() +
+                    ", topicId=" + topicId + "]");
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send partition supply message to node: " + 
id, e);
+            U.error(log, "Failed to send partition supply message to node: " + 
nodeId, e);
         }
         catch (IgniteSpiException e) {
             if (log.isDebugEnabled())
@@ -445,22 +413,25 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * @param n Node.
-     * @param d DemandMessage
+     * Sends supply message to demand node.
+     *
+     * @param node Recipient of supply message.
+     * @param d Demand message.
      * @param s Supply message.
+     * @param contextId Supply context id.
      * @return {@code True} if message was sent, {@code false} if recipient 
left grid.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reply(ClusterNode n,
+    private boolean reply(ClusterNode node,
         GridDhtPartitionDemandMessage d,
         GridDhtPartitionSupplyMessage s,
-        T3<UUID, Integer, AffinityTopologyVersion> scId)
+        T3<UUID, Integer, AffinityTopologyVersion> contextId)
         throws IgniteCheckedException {
         try {
             if (log.isDebugEnabled())
-                log.debug("Replying to partition demand [node=" + n.id() + ", 
demand=" + d + ", supply=" + s + ']');
+                log.debug("Replying to partition demand [node=" + node.id() + 
", demand=" + d + ", supply=" + s + ']');
 
-            grp.shared().io().sendOrderedMessage(n, d.topic(), s, 
grp.ioPolicy(), d.timeout());
+            grp.shared().io().sendOrderedMessage(node, d.topic(), s, 
grp.ioPolicy(), d.timeout());
 
             // Throttle preloading.
             if (grp.config().getRebalanceThrottle() > 0)
@@ -470,10 +441,10 @@ class GridDhtPartitionSupplier {
         }
         catch (ClusterTopologyCheckedException ignore) {
             if (log.isDebugEnabled())
-                log.debug("Failed to send partition supply message because 
node left grid: " + n.id());
+                log.debug("Failed to send partition supply message because 
node left grid: " + node.id());
 
             synchronized (scMap) {
-                clearContext(scMap.remove(scId), log);
+                clearContext(scMap.remove(contextId), log);
             }
 
             return false;
@@ -481,95 +452,50 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * @param t Tuple.
-     * @param phase Phase.
-     * @param partIt Partition it.
-     * @param part Partition.
-     * @param entryIt Entry it.
+     * Saves supply context with given parameters to {@code scMap}.
+     *
+     * @param contextId Supply context id.
+     * @param entryIt Entries rebalance iterator.
+     * @param remainingParts Set of partitions that weren't sent yet.
+     * @param rebalanceId Rebalance id.
      */
     private void saveSupplyContext(
-        T3<UUID, Integer, AffinityTopologyVersion> t,
-        SupplyContextPhase phase,
-        Iterator<Integer> partIt,
-        int part,
-        Iterator<?> entryIt,
-        GridDhtLocalPartition loc,
-        AffinityTopologyVersion topVer,
-        long updateSeq) {
+        T3<UUID, Integer, AffinityTopologyVersion> contextId,
+        IgniteRebalanceIterator entryIt,
+        Set<Integer> remainingParts,
+        long rebalanceId) {
         synchronized (scMap) {
-            if (grp.affinity().lastVersion().equals(topVer)) {
-                assert scMap.get(t) == null;
-
-                scMap.put(t,
-                    new SupplyContext(phase,
-                        partIt,
-                        entryIt,
-                        part,
-                        loc,
-                        updateSeq));
-            }
-            else if (loc != null) {
-                assert loc.reservations() > 0;
+            assert scMap.get(contextId) == null;
 
-                loc.release();
-            }
+            scMap.put(contextId, new SupplyContext(entryIt, remainingParts, 
rebalanceId));
         }
     }
 
     /**
-     * Supply context phase.
-     */
-    private enum SupplyContextPhase {
-        /** */
-        NEW,
-        /** */
-        OFFHEAP
-    }
-
-    /**
      * Supply context.
      */
     private static class SupplyContext {
-        /** Phase. */
-        private final SupplyContextPhase phase;
-
-        /** Partition iterator. */
+        /** Entries iterator. */
         @GridToStringExclude
-        private final Iterator<Integer> partIt;
+        private final IgniteRebalanceIterator iterator;
 
-        /** Entry iterator. */
-        @GridToStringExclude
-        private final Iterator<?> entryIt;
-
-        /** Partition. */
-        private final int part;
-
-        /** Local partition. */
-        private final GridDhtLocalPartition loc;
+        /** Set of partitions which weren't sent yet. */
+        private final Set<Integer> remainingParts;
 
-        /** Update seq. */
-        private final long updateSeq;
+        /** Rebalance id. */
+        private final long rebalanceId;
 
         /**
-         * @param phase Phase.
-         * @param partIt Partition iterator.
-         * @param loc Partition.
-         * @param updateSeq Update sequence.
-         * @param entryIt Entry iterator.
-         * @param part Partition.
+         * Constructor.
+         *
+         * @param iterator Entries rebalance iterator.
+         * @param remainingParts Set of partitions which weren't sent yet.
+         * @param rebalanceId Rebalance id.
          */
-        public SupplyContext(SupplyContextPhase phase,
-            Iterator<Integer> partIt,
-            Iterator<?> entryIt,
-            int part,
-            GridDhtLocalPartition loc,
-            long updateSeq) {
-            this.phase = phase;
-            this.partIt = partIt;
-            this.entryIt = entryIt;
-            this.part = part;
-            this.loc = loc;
-            this.updateSeq = updateSeq;
+        SupplyContext(IgniteRebalanceIterator iterator, Set<Integer> 
remainingParts, long rebalanceId) {
+            this.iterator = iterator;
+            this.remainingParts = remainingParts;
+            this.rebalanceId = rebalanceId;
         }
 
         /** {@inheritDoc} */
@@ -577,20 +503,4 @@ class GridDhtPartitionSupplier {
             return S.toString(SupplyContext.class, this);
         }
     }
-
-    /**
-     * Dumps debug information.
-     */
-    public void dumpDebugInfo() {
-        synchronized (scMap) {
-            if (!scMap.isEmpty()) {
-                U.warn(log, "Rebalancing supplier reserved following 
partitions:");
-
-                for (SupplyContext sc : scMap.values()) {
-                    if (sc.loc != null)
-                        U.warn(log, ">>> " + sc.loc);
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 90d11f5..4ae5acd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -49,8 +49,8 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Update sequence. */
-    private long updateSeq;
+    /** An unique (per demander) rebalance id. */
+    private long rebalanceId;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
@@ -84,17 +84,17 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
     private Map<Integer, Long> keysPerCache;
 
     /**
-     * @param updateSeq Update sequence for this node.
+     * @param rebalanceId Rebalance id.
      * @param grpId Cache group ID.
      * @param topVer Topology version.
      * @param addDepInfo Deployment info flag.
      */
-    GridDhtPartitionSupplyMessage(long updateSeq,
+    GridDhtPartitionSupplyMessage(long rebalanceId,
         int grpId,
         AffinityTopologyVersion topVer,
         boolean addDepInfo) {
         this.grpId = grpId;
-        this.updateSeq = updateSeq;
+        this.rebalanceId = rebalanceId;
         this.topVer = topVer;
         this.addDepInfo = addDepInfo;
     }
@@ -112,10 +112,10 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
     }
 
     /**
-     * @return Update sequence.
+     * @return Rebalance id.
      */
-    long updateSequence() {
-        return updateSeq;
+    long rebalanceId() {
+        return rebalanceId;
     }
 
     /**
@@ -328,7 +328,8 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeLong("updateSeq", updateSeq))
+                // Keep 'updateSeq' name for compatibility.
+                if (!writer.writeLong("updateSeq", rebalanceId))
                     return false;
 
                 writer.incrementState();
@@ -414,7 +415,8 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
                 reader.incrementState();
 
             case 11:
-                updateSeq = reader.readLong("updateSeq");
+                // Keep 'updateSeq' name for compatibility.
+                rebalanceId = reader.readLong("updateSeq");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b4febf7..ce888b7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2285,8 +2285,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (localReserved != null) {
                 Long localCntr = localReserved.get(p);
 
-                if (localCntr != null && localCntr <= minCntr &&
-                    maxCntrObj.nodes.contains(cctx.localNodeId())) {
+                if (localCntr != null && localCntr <= minCntr && 
maxCntrObj.nodes.contains(cctx.localNodeId())) {
                     partHistSuppliers.put(cctx.localNodeId(), top.groupId(), 
p, minCntr);
 
                     haveHistory.add(p);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0b499fb..6ec6ad3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -22,14 +22,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
@@ -41,7 +37,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -51,15 +46,13 @@ import 
org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
@@ -90,12 +83,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
 
     /** */
-    private final ConcurrentHashMap<Integer, GridDhtLocalPartition> 
partsToEvict = new ConcurrentHashMap<>();
-
-    /** */
-    private final AtomicInteger partsEvictOwning = new AtomicInteger();
-
-    /** */
     private boolean stopped;
 
     /**
@@ -178,7 +165,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture 
exchFut) {
+    @Override public GridDhtPreloaderAssignments 
generateAssignments(GridDhtPartitionExchangeId exchId, 
GridDhtPartitionsExchangeFuture exchFut) {
         assert exchFut == null || exchFut.isDone();
 
         // No assignments for disabled preloader.
@@ -196,19 +183,21 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
             ", grp=" + grp.name() +
             ", topVer=" + top.readyTopologyVersion() + ']';
 
-        GridDhtPreloaderAssignments assigns = new 
GridDhtPreloaderAssignments(exchId, topVer);
+        GridDhtPreloaderAssignments assignments = new 
GridDhtPreloaderAssignments(exchId, topVer);
 
         AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
+        CachePartitionFullCountersMap cntrMap = top.fullUpdateCounters();
+
         for (int p = 0; p < partCnt; p++) {
             if (ctx.exchange().hasPendingExchange()) {
                 if (log.isDebugEnabled())
                     log.debug("Skipping assignments creation, exchange worker 
has pending assignments: " +
                         exchId);
 
-                assigns.cancelled(true);
+                assignments.cancelled(true);
 
-                return assigns;
+                return assignments;
             }
 
             // If partition belongs to local node.
@@ -218,6 +207,27 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                 assert part != null;
                 assert part.id() == p;
 
+                // Do not rebalance OWNING or LOST partitions.
+                if (part.state() == OWNING || part.state() == LOST)
+                    continue;
+
+                // If partition is currently rented prevent destroy and start 
clearing process.
+                if (part.state() == RENTING) {
+                    if (part.reserve()) {
+                        part.moving();
+                        part.clearAsync();
+
+                        part.release();
+                    }
+                }
+
+                // If partition was destroyed recreate it.
+                if (part.state() == EVICTED) {
+                    part = top.localPartition(p, topVer, true);
+                }
+
+                assert part != null && part.state() == MOVING : "Partition has 
invalid state for rebalance " + part;
+
                 ClusterNode histSupplier = null;
 
                 if (grp.persistenceEnabled() && exchFut != null) {
@@ -228,65 +238,23 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                 }
 
                 if (histSupplier != null) {
-                    if (part.state() != MOVING) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping partition assignment (state is 
not MOVING): " + part);
-
-                        continue; // For.
-                    }
-
                     assert grp.persistenceEnabled();
                     assert remoteOwners(p, topVer).contains(histSupplier) : 
remoteOwners(p, topVer);
 
-                    GridDhtPartitionDemandMessage msg = 
assigns.get(histSupplier);
+                    GridDhtPartitionDemandMessage msg = 
assignments.get(histSupplier);
 
                     if (msg == null) {
-                        assigns.put(histSupplier, msg = new 
GridDhtPartitionDemandMessage(
+                        assignments.put(histSupplier, msg = new 
GridDhtPartitionDemandMessage(
                             top.updateSequence(),
-                            assigns.topologyVersion(),
-                            grp.groupId()));
+                            assignments.topologyVersion(),
+                            grp.groupId())
+                        );
                     }
 
-                    msg.addPartition(p, true);
+                    msg.partitions().addHistorical(p, 
cntrMap.initialUpdateCounter(p), cntrMap.updateCounter(p), partCnt);
                 }
                 else {
-                    if (grp.persistenceEnabled()) {
-                        if (part.state() == RENTING || part.state() == 
EVICTED) {
-                            IgniteInternalFuture<?> rentFut = part.rent(false);
-
-                            while (true) {
-                                try {
-                                    rentFut.get(20, TimeUnit.SECONDS);
-
-                                    break;
-                                }
-                                catch (IgniteFutureTimeoutCheckedException 
ignore) {
-                                    // Continue.
-                                    U.warn(log, "Still waiting for partition 
eviction: " + part);
-
-                                    part.tryEvictAsync(false);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Error while clearing 
outdated local partition", e);
-
-                                    break;
-                                }
-                            }
-
-                            part = top.localPartition(p, topVer, true);
-
-                            assert part != null;
-                        }
-                    }
-
-                    if (part.state() != MOVING) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping partition assignment (state is 
not MOVING): " + part);
-
-                        continue; // For.
-                    }
-
-                    Collection<ClusterNode> picked = pickedOwners(p, topVer);
+                    Collection<ClusterNode> picked = pickOwners(p, topVer);
 
                     if (picked.isEmpty()) {
                         top.own(part);
@@ -305,22 +273,22 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                     else {
                         ClusterNode n = F.rand(picked);
 
-                        GridDhtPartitionDemandMessage msg = assigns.get(n);
+                        GridDhtPartitionDemandMessage msg = assignments.get(n);
 
-                    if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                            top.updateSequence(),
-                            assigns.topologyVersion(),
-                            grp.groupId()));
-                    }
+                        if (msg == null) {
+                            assignments.put(n, msg = new 
GridDhtPartitionDemandMessage(
+                                top.updateSequence(),
+                                assignments.topologyVersion(),
+                                grp.groupId()));
+                        }
 
-                        msg.addPartition(p, false);
+                        msg.partitions().addFull(p);
                     }
                 }
             }
         }
 
-        return assigns;
+        return assignments;
     }
 
     /** {@inheritDoc} */
@@ -329,11 +297,13 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /**
+     * Picks owners for specified partition {@code p} from affinity.
+     *
      * @param p Partition.
      * @param topVer Topology version.
      * @return Picked owners.
      */
-    private Collection<ClusterNode> pickedOwners(int p, 
AffinityTopologyVersion topVer) {
+    private Collection<ClusterNode> pickOwners(int p, AffinityTopologyVersion 
topVer) {
         Collection<ClusterNode> affNodes = 
grp.affinity().cachedAffinity(topVer).get(p);
 
         int affCnt = affNodes.size();
@@ -355,6 +325,8 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /**
+     * Returns remote owners (excluding local node) for specified partition 
{@code p}.
+     *
      * @param p Partition.
      * @param topVer Topology version.
      * @return Nodes owning this partition.
@@ -400,11 +372,11 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     @Override public Runnable addAssignments(
         GridDhtPreloaderAssignments assignments,
         boolean forceRebalance,
-        int cnt,
+        long rebalanceId,
         Runnable next,
         @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut
     ) {
-        return demander.addAssignments(assignments, forceRebalance, cnt, next, 
forcedRebFut);
+        return demander.addAssignments(assignments, forceRebalance, 
rebalanceId, next, forcedRebFut);
     }
 
     /**
@@ -573,73 +545,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
-        partsToEvict.putIfAbsent(part.id(), part);
-
-        if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 
1)) {
-            ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
-                @Override public Boolean call() {
-                    boolean locked = true;
-
-                    while (locked || !partsToEvict.isEmpty()) {
-                        if (!locked && !partsEvictOwning.compareAndSet(0, 1))
-                            return false;
-
-                        try {
-                            for (GridDhtLocalPartition part : 
partsToEvict.values()) {
-                                try {
-                                    partsToEvict.remove(part.id());
-
-                                    part.tryEvict();
-
-                                    GridDhtPartitionState state = part.state();
-
-                                    if (state == RENTING || ((state == MOVING 
|| state == OWNING) && part.shouldBeRenting()))
-                                        partsToEvict.put(part.id(), part);
-                                }
-                                catch (Throwable ex) {
-                                    if (ctx.kernalContext().isStopping()) {
-                                        LT.warn(log, ex, "Partition eviction 
failed (current node is stopping).",
-                                            false,
-                                            true);
-
-                                        partsToEvict.clear();
-
-                                        return true;
-                                    }
-                                    else
-                                        LT.error(log, ex, "Partition eviction 
failed, this can cause grid hang.");
-                                }
-                            }
-                        }
-                        finally {
-                            if (!partsToEvict.isEmpty()) {
-                                if (ctx.kernalContext().isStopping()) {
-                                    partsToEvict.clear();
-
-                                    locked = false;
-                                }
-                                else
-                                    locked = true;
-                            }
-                            else {
-                                boolean res = 
partsEvictOwning.compareAndSet(1, 0);
-
-                                assert res;
-
-                                locked = false;
-                            }
-                        }
-                    }
-
-                    return true;
-                }
-            }, /*system pool*/ true);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public void dumpDebugInfo() {
-        supplier.dumpDebugInfo();
+        // No-op
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
new file mode 100644
index 0000000..d829b53
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Map of partitions demanded during rebalancing.
+ */
+public class IgniteDhtDemandedPartitionsMap implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Map of partitions that will be preloaded from history. (partId -> 
(fromCntr, toCntr)). */
+    private CachePartitionPartialCountersMap historical;
+
+    /** Set of partitions that will be preloaded from all it's current data. */
+    private Set<Integer> full;
+
+    public IgniteDhtDemandedPartitionsMap(
+        @Nullable CachePartitionPartialCountersMap historical,
+        @Nullable Set<Integer> full)
+    {
+        this.historical = historical;
+        this.full = full;
+    }
+
+    public IgniteDhtDemandedPartitionsMap() {
+    }
+
+    /**
+     * Adds partition for preloading from history.
+     *
+     * @param partId Partition ID.
+     * @param from First demanded counter.
+     * @param to Last demanded counter.
+     * @param partCnt Maximum possible partition count.
+     */
+    public void addHistorical(int partId, long from, long to, int partCnt) {
+        assert !hasFull(partId);
+
+        if (historical == null)
+            historical = new CachePartitionPartialCountersMap(partCnt);
+
+        historical.add(partId, from, to);
+    }
+
+    /**
+     * Adds partition for preloading from all current data.
+     * @param partId Partition ID.
+     */
+    public void addFull(int partId) {
+        assert !hasHistorical(partId);
+
+        if (full == null)
+            full = new HashSet<>();
+
+        full.add(partId);
+    }
+
+    /**
+     * Removes partition.
+     * @param partId Partition ID.
+     * @return {@code True} if changed.
+     */
+    public boolean remove(int partId) {
+        assert !(hasFull(partId) && hasHistorical(partId));
+
+        if (full != null && full.remove(partId))
+            return true;
+
+        if (historical != null && historical.remove(partId))
+            return true;
+
+        return false;
+    }
+
+    /** */
+    public boolean hasPartition(int partId) {
+        return hasHistorical(partId) || hasFull(partId);
+    }
+
+    /** */
+    public boolean hasHistorical() {
+        return historical != null && !historical.isEmpty();
+    }
+
+    /** */
+    public boolean hasHistorical(int partId) {
+        return historical != null && historical.contains(partId);
+    }
+
+    /** */
+    public boolean hasFull() {
+        return full != null && !full.isEmpty();
+    }
+
+    /** */
+    public boolean hasFull(int partId) {
+        return full != null && full.contains(partId);
+    }
+
+    /** */
+    public boolean isEmpty() {
+        return !hasFull() && !hasHistorical();
+    }
+
+    /** */
+    public int size() {
+        int histSize = historical != null ? historical.size() : 0;
+        int fullSize = full != null ? full.size() : 0;
+
+        return histSize + fullSize;
+    }
+
+    /** */
+    public CachePartitionPartialCountersMap historicalMap() {
+        if (historical == null)
+            return CachePartitionPartialCountersMap.EMPTY;
+
+        return historical;
+    }
+
+    /** */
+    public Set<Integer> fullSet() {
+        if (full == null)
+            return Collections.emptySet();
+
+        return Collections.unmodifiableSet(full);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDhtDemandedPartitionsMap.class, this);
+    }
+
+    /**
+     * @return String representation of partitions list.
+     */
+    String partitionsList() {
+        List<Integer> s = new ArrayList<>(size());
+
+        s.addAll(fullSet());
+
+        for (int i = 0; i < historicalMap().size(); i++) {
+            int p = historicalMap().partitionAt(i);
+
+            assert !s.contains(p);
+
+            s.add(p);
+        }
+
+        Collections.sort(s);
+
+        StringBuilder sb = new StringBuilder();
+
+        int start = -1;
+
+        int prev = -1;
+
+        Iterator<Integer> sit = s.iterator();
+
+        while (sit.hasNext()) {
+            int p = sit.next();
+
+            if (start == -1) {
+                start = p;
+                prev = p;
+            }
+
+            if (prev < p - 1) {
+                sb.append(start);
+
+                if (start != prev)
+                    sb.append("-").append(prev);
+
+                sb.append(", ");
+
+                start = p;
+            }
+
+            if (!sit.hasNext()) {
+                sb.append(start);
+
+                if (start != p)
+                    sb.append("-").append(p);
+            }
+
+            prev = p;
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index dc2fbf8..4a22abd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Partition counters map.
@@ -68,4 +69,9 @@ public class IgniteDhtPartitionCountersMap implements 
Serializable {
 
         return cntrMap;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDhtPartitionCountersMap.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
index 1c69374..f8da6a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -107,4 +108,9 @@ public class IgniteDhtPartitionHistorySuppliersMap 
implements Serializable {
     public synchronized void putAll(IgniteDhtPartitionHistorySuppliersMap 
that) {
         map = that.map;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDhtPartitionHistorySuppliersMap.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
index 37ca7e4..7066e0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Partition reload map.
@@ -92,4 +93,9 @@ public class IgniteDhtPartitionsToReloadMap implements 
Serializable {
     public boolean isEmpty() {
         return map == null || map.isEmpty();
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDhtPartitionsToReloadMap.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIterator.java
new file mode 100644
index 0000000..7ed16af
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+
+/**
+ * Iterator that provides history of updates for a subset of partitions.
+ */
+public interface IgniteHistoricalIterator extends 
GridCloseableIterator<CacheDataRow> {
+    /**
+     * @param partId Partition ID.
+     * @return {@code True} if iterator contains data for given partition.
+     */
+    public boolean contains(int partId);
+
+    /**
+     * @param partId Partition ID.
+     * @return {@code True} if all data for given partition has already been 
returned.
+     */
+    public boolean isDone(int partId);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java
new file mode 100644
index 0000000..87ba3a9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteRebalanceIteratorImpl.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Default iterator for rebalancing.
+ */
+public class IgniteRebalanceIteratorImpl implements IgniteRebalanceIterator {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Iterators for full preloading, ordered by partition ID. */
+    @Nullable private final NavigableMap<Integer, 
GridCloseableIterator<CacheDataRow>> fullIterators;
+
+    /** Iterator for historical preloading. */
+    @Nullable private final IgniteHistoricalIterator historicalIterator;
+
+    /** Partitions marked as missing. */
+    private final Set<Integer> missingParts = new HashSet<>();
+
+    /** Current full iterator. */
+    private Map.Entry<Integer, GridCloseableIterator<CacheDataRow>> current;
+
+    /** */
+    private boolean reachedEnd;
+
+    /** */
+    private boolean closed;
+
+    /**
+     *
+     * @param fullIterators
+     * @param historicalIterator
+     * @throws IgniteCheckedException
+     */
+    public IgniteRebalanceIteratorImpl(
+        NavigableMap<Integer, GridCloseableIterator<CacheDataRow>> 
fullIterators,
+        IgniteHistoricalIterator historicalIterator) throws 
IgniteCheckedException {
+        this.fullIterators = fullIterators;
+        this.historicalIterator = historicalIterator;
+
+        advance();
+    }
+
+    /** */
+    private synchronized void advance() throws IgniteCheckedException {
+        if (fullIterators.isEmpty())
+            reachedEnd = true;
+
+        while (!reachedEnd && (current == null || 
!current.getValue().hasNextX() || missingParts.contains(current.getKey()))) {
+            if (current == null)
+                current = fullIterators.firstEntry();
+            else {
+                current = fullIterators.ceilingEntry(current.getKey() + 1);
+
+                if (current == null)
+                    reachedEnd = true;
+            }
+        }
+
+        assert current != null || reachedEnd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean historical(int partId) {
+        return historicalIterator != null && 
historicalIterator.contains(partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean isPartitionDone(int partId) {
+        if (missingParts.contains(partId))
+            return false;
+
+        if (historical(partId))
+            return historicalIterator.isDone(partId);
+
+        return current == null || current.getKey() > partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean isPartitionMissing(int partId) {
+        return missingParts.contains(partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void setPartitionMissing(int partId) {
+        missingParts.add(partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean hasNextX() throws 
IgniteCheckedException {
+        if (historicalIterator != null && historicalIterator.hasNextX())
+            return true;
+
+        return current != null && current.getValue().hasNextX();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized CacheDataRow nextX() throws 
IgniteCheckedException {
+        if (historicalIterator != null && historicalIterator.hasNextX())
+            return historicalIterator.nextX();
+
+        if (current == null || !current.getValue().hasNextX())
+            throw new NoSuchElementException();
+
+        CacheDataRow result = current.getValue().nextX();
+
+        assert result.partition() == current.getKey();
+
+        advance();
+
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void removeX() throws IgniteCheckedException 
{
+        throw new UnsupportedOperationException("remove");
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IgniteCheckedException {
+        if (historicalIterator != null)
+            historicalIterator.close();
+
+        if (fullIterators != null) {
+            for (GridCloseableIterator<CacheDataRow> iter : 
fullIterators.values())
+                iter.close();
+        }
+
+        closed = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean isClosed() {
+        return closed;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Iterator<CacheDataRow> iterator() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean hasNext() {
+        try {
+            return hasNextX();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized CacheDataRow next() {
+        try {
+            return nextX();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove() {
+        try {
+            removeX();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index befe062..eb0c30f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -46,10 +47,12 @@ import 
org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
-import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
 import 
org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -62,6 +65,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
@@ -569,6 +573,43 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
     }
 
+    /**
+     * Destroys given {@code store} and creates new with the same update 
counters as in given.
+     *
+     * @param store Store to destroy.
+     * @return New cache data store.
+     * @throws IgniteCheckedException If failed.
+     */
+    public CacheDataStore recreateCacheDataStore(CacheDataStore store) throws 
IgniteCheckedException {
+        long updCounter = store.updateCounter();
+        long initUpdCounter = store.initialUpdateCounter();
+
+        int p = store.partId();
+
+        PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
+
+        int tag = pageMemory.invalidate(grp.groupId(), p);
+
+        ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
+
+        CacheDataStore store0;
+
+        partStoreLock.lock(p);
+
+        try {
+            store0 = createCacheDataStore0(p);
+            store0.updateCounter(updCounter);
+            store0.updateInitialCounter(initUpdCounter);
+
+            partDataStores.put(p, store0);
+        }
+        finally {
+            partStoreLock.unlock(p);
+        }
+
+        return store0;
+    }
+
     /** {@inheritDoc} */
     @Override public void onPartitionCounterUpdated(int part, long cntr) {
         CacheDataStore store = partDataStores.get(part);
@@ -691,32 +732,31 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteRebalanceIterator rebalanceIterator(int part, 
AffinityTopologyVersion topVer,
-        Long partCntrSince) throws IgniteCheckedException {
-        if (partCntrSince == null)
-            return super.rebalanceIterator(part, topVer, partCntrSince);
+    @Override @Nullable protected IgniteHistoricalIterator historicalIterator(
+        CachePartitionPartialCountersMap partCntrs) throws 
IgniteCheckedException {
+        if (partCntrs == null || partCntrs.isEmpty())
+            return null;
 
-        GridCacheDatabaseSharedManager database = 
(GridCacheDatabaseSharedManager)ctx.database();
+        GridCacheDatabaseSharedManager database = 
(GridCacheDatabaseSharedManager)grp.shared().database();
 
-        try {
-            WALPointer startPtr = 
database.searchPartitionCounter(grp.groupId(), part, partCntrSince);
+        FileWALPointer minPtr = null;
 
-            if (startPtr == null) {
-                assert false : "partCntr=" + partCntrSince + ", reservations=" 
+ S.toString(Map.class, database.reservedForPreloading());
+        for (int i = 0; i < partCntrs.size(); i++) {
+            int p = partCntrs.partitionAt(i);
+            long initCntr = partCntrs.initialUpdateCounterAt(i);
 
-                return super.rebalanceIterator(part, topVer, partCntrSince);
-            }
+            FileWALPointer startPtr = 
(FileWALPointer)database.searchPartitionCounter(grp.groupId(), p, initCntr);
 
-            WALIterator it = ctx.wal().replay(startPtr);
+            if (startPtr == null)
+                throw new IgniteCheckedException("Could not find start pointer 
for partition [part=" + p + ", partCntrSince=" + initCntr + "]");
 
-            return new RebalanceIteratorAdapter(grp, it, part);
+            if (minPtr == null || startPtr.compareTo(minPtr) == -1)
+                minPtr = startPtr;
         }
-        catch (IgniteCheckedException e) {
-            U.warn(log, "Failed to create WAL-based rebalance iterator (a full 
partition will transferred to a " +
-                "remote node) [part=" + part + ", partCntrSince=" + 
partCntrSince + ", err=" + e + ']');
 
-            return super.rebalanceIterator(part, topVer, partCntrSince);
-        }
+        WALIterator it = grp.shared().wal().replay(minPtr);
+
+        return new WALIteratorAdapter(grp, partCntrs, it);
     }
 
     /**
@@ -744,18 +784,24 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     /**
      *
      */
-    private static class RebalanceIteratorAdapter implements 
IgniteRebalanceIterator {
-        /** Serial version uid. */
+    private static class WALIteratorAdapter implements 
IgniteHistoricalIterator {
+        /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache group caches. */
-        private final Set<Integer> cacheGrpCaches;
+        /** Cache context. */
+        private final CacheGroupContext grp;
 
-        /** WAL iterator. */
-        private final WALIterator walIt;
+        /** Partition counters map. */
+        private final CachePartitionPartialCountersMap partMap;
+
+        /** Partitions marked as done. */
+        private final Set<Integer> doneParts = new HashSet<>();
+
+        /** Cache IDs. This collection is stored as field to avoid 
re-calculation on each iteration. */
+        private final Set<Integer> cacheIds;
 
-        /** Partition to scan. */
-        private final int part;
+        /** WAL iterator. */
+        private WALIterator walIt;
 
         /** */
         private Iterator<DataEntry> entryIt;
@@ -764,21 +810,27 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         private CacheDataRow next;
 
         /**
-         * @param grp Cache group.
+         * @param grp Cache context.
          * @param walIt WAL iterator.
-         * @param part Partition ID.
          */
-        private RebalanceIteratorAdapter(CacheGroupContext grp, WALIterator 
walIt, int part) {
-            this.cacheGrpCaches = grp.cacheIds();
+        private WALIteratorAdapter(CacheGroupContext grp, 
CachePartitionPartialCountersMap partMap, WALIterator walIt) {
+            this.grp = grp;
+            this.partMap = partMap;
             this.walIt = walIt;
-            this.part = part;
+
+            cacheIds = grp.cacheIds();
 
             advance();
         }
 
         /** {@inheritDoc} */
-        @Override public boolean historical() {
-            return true;
+        @Override public boolean contains(int partId) {
+            return partMap.contains(partId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isDone(int partId) {
+            return doneParts.contains(partId);
         }
 
         /** {@inheritDoc} */
@@ -844,11 +896,23 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                     while (entryIt.hasNext()) {
                         DataEntry entry = entryIt.next();
 
-                        if (entry.partitionId() == part && 
cacheGrpCaches.contains(entry.cacheId())) {
+                        if (cacheIds.contains(entry.cacheId())) {
+                            int idx = 
partMap.partitionIndex(entry.partitionId());
 
-                            next = new DataEntryRow(entry);
+                            if (idx < 0)
+                                continue;
 
-                            return;
+                            long from = partMap.initialUpdateCounterAt(idx);
+                            long to = partMap.updateCounterAt(idx);
+
+                            if (entry.partitionCounter() >= from && 
entry.partitionCounter() <= to) {
+                                if (entry.partitionCounter() == to)
+                                    doneParts.add(entry.partitionId());
+
+                                next = new DataEntryRow(entry);
+
+                                return;
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index ae7e214..23d3941 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -228,7 +228,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
 
                 if (cache.lostPartitions().contains(i)) {
                     if (safe)
-                        fail("Reading from a lost partition should have 
failed: " + i);
+                        fail("Reading from a lost partition should have 
failed: " + i + " " + ig.name());
                     // else we could have read anything.
                 }
                 else

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CachePartitionPartialCountersMapSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CachePartitionPartialCountersMapSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CachePartitionPartialCountersMapSelfTest.java
new file mode 100644
index 0000000..a4afbca
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CachePartitionPartialCountersMapSelfTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+public class CachePartitionPartialCountersMapSelfTest extends 
GridCommonAbstractTest {
+
+    public void testAddAndRemove() throws Exception {
+        CachePartitionPartialCountersMap map = new 
CachePartitionPartialCountersMap(10);
+
+        for (int p = 0; p < 10; p++)
+            map.add(p, 2 * p, 3 * p);
+
+        for (int p = 0; p < 10; p++) {
+            assertEquals(p, map.partitionAt(p));
+            assertEquals(2 * p, map.initialUpdateCounterAt(p));
+            assertEquals(3 * p, map.updateCounterAt(p));
+        }
+
+        map.remove(3);
+        map.remove(11);
+        map.remove(7);
+
+        assertEquals(8, map.size());
+
+        int idx = 0;
+
+        for (int p = 0; p < 10; p++) {
+            if (p == 3 || p == 10 || p == 7)
+                continue;
+
+            assertEquals(p, map.partitionAt(idx));
+            assertEquals(2 * p, map.initialUpdateCounterAt(idx));
+            assertEquals(3 * p, map.updateCounterAt(idx));
+
+            idx++;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index f6667f3..ed51cf3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -149,7 +149,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
 
         iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, 
cacheRCfg2);
 
-        iCfg.setRebalanceThreadPoolSize(2);
+        iCfg.setRebalanceThreadPoolSize(3);
 
         return iCfg;
     }
@@ -507,6 +507,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
                     cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
                     cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
                     cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+                    cacheRCfg.setRebalanceBatchesPrefetchCount(1);
 
                     grid(0).getOrCreateCache(cacheRCfg);
 

Reply via email to