http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
deleted file mode 100644
index 845d3ed..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
-
-/**
- * Reservation mechanism for multiple partitions allowing to do a reservation 
in one operation.
- */
-public class GridDhtPartitionsReservation implements GridReservable {
-    /** */
-    private static final GridDhtLocalPartition[] EMPTY = {};
-
-    /** */
-    private static final CI1<GridDhtPartitionsReservation> NO_OP = new 
CI1<GridDhtPartitionsReservation>() {
-        @Override public void apply(GridDhtPartitionsReservation 
gridDhtPartitionsReservation) {
-            throw new IllegalStateException();
-        }
-    };
-
-    /** */
-    private final Object appKey;
-
-    /** */
-    private final GridCacheContext<?,?> cctx;
-
-    /** */
-    private final AffinityTopologyVersion topVer;
-
-    /** */
-    private final AtomicReference<GridDhtLocalPartition[]> parts = new 
AtomicReference<>();
-
-    /** */
-    private final AtomicReference<CI1<GridDhtPartitionsReservation>> unpublish 
= new AtomicReference<>();
-
-    /** */
-    private final AtomicInteger reservations = new AtomicInteger();
-
-    /**
-     * @param topVer AffinityTopologyVersion version.
-     * @param cctx Cache context.
-     * @param appKey Application key for reservation.
-     */
-    public GridDhtPartitionsReservation(AffinityTopologyVersion topVer, 
GridCacheContext<?,?> cctx, Object appKey) {
-        assert topVer != null;
-        assert cctx != null;
-        assert appKey != null;
-
-        this.topVer = topVer;
-        this.cctx = cctx;
-        this.appKey = appKey;
-    }
-
-    /**
-     * Registers all the given partitions for this reservation.
-     *
-     * @param parts Partitions.
-     * @return {@code true} If registration succeeded and this reservation can 
be published.
-     */
-    public boolean register(Collection<? extends GridReservable> parts) {
-        assert !F.isEmpty(parts) : "empty partitions list";
-
-        GridDhtLocalPartition[] arr = new GridDhtLocalPartition[parts.size()];
-
-        int i = 0;
-        int prevPart = -1;
-        boolean sorted = true; // Most probably it is a sorted list.
-
-        for (GridReservable part : parts) {
-            arr[i] = (GridDhtLocalPartition)part;
-
-            if (sorted) { // Make sure it will be a sorted array.
-                int id = arr[i].id();
-
-                if (id <= prevPart)
-                    sorted = false;
-
-                prevPart = id;
-            }
-
-            i++;
-        }
-
-        if (!sorted)
-            Arrays.sort(arr);
-
-        i = 0;
-        prevPart = -1;
-
-        // Register in correct sort order.
-        for (GridDhtLocalPartition part : arr) {
-            if (prevPart == part.id())
-                throw new IllegalStateException("Duplicated partitions.");
-
-            prevPart = part.id();
-
-            if (!part.addReservation(this)) {
-                if (i != 0)
-                    throw new IllegalStateException(
-                        "Trying to reserve different sets of partitions for 
the same topology version.");
-
-                return false;
-            }
-
-            i++;
-        }
-
-        if (!this.parts.compareAndSet(null, arr))
-            throw new IllegalStateException("Partitions can be registered only 
once.");
-
-        assert reservations.get() != -1 : "all the partitions must be reserved 
before register, we can't be invalidated";
-
-        return true;
-    }
-
-    /**
-     * Must be called when this reservation is published.
-     *
-     * @param unpublish Closure to unpublish this reservation when it will 
become invalid.
-     */
-    public void onPublish(CI1<GridDhtPartitionsReservation> unpublish) {
-        assert unpublish != null;
-
-        if (!this.unpublish.compareAndSet(null, unpublish))
-            throw new IllegalStateException("Unpublishing closure can be set 
only once.");
-
-        if (reservations.get() == -1)
-            unregister();
-    }
-
-    /**
-     * Reserves all the registered partitions.
-     *
-     * @return {@code true} If succeeded.
-     */
-    @Override public boolean reserve() {
-        assert parts.get() != null : "partitions must be registered before the 
first reserve attempt";
-
-        for (;;) {
-            int r = reservations.get();
-
-            if (r == -1) // Invalidated.
-                return false;
-
-            assert r >= 0 : r;
-
-            if (reservations.compareAndSet(r, r + 1))
-                return true;
-        }
-    }
-
-    /**
-     * @param parts Partitions.
-     */
-    private static void tryEvict(GridDhtLocalPartition[] parts) {
-        if (parts == null)  // Can be not initialized yet.
-            return ;
-
-        for (GridDhtLocalPartition part : parts)
-            tryEvict(part);
-    }
-
-    /**
-     * @param part Partition.
-     */
-    private static void tryEvict(GridDhtLocalPartition part) {
-        if (part.state() == RENTING && part.reservations() == 0)
-            part.tryContinueClearing();
-    }
-
-    /**
-     * Releases all the registered partitions.
-     */
-    @Override public void release() {
-        for (;;) {
-            int r = reservations.get();
-
-            if (r <= 0)
-                throw new IllegalStateException("Method 'reserve' must be 
called before 'release'.");
-
-            if (reservations.compareAndSet(r, r - 1)) {
-                // If it was the last reservation and topology version changed 
-> attempt to evict partitions.
-                if (r == 1 && !cctx.kernalContext().isStopping() &&
-                    
!topVer.equals(cctx.topology().lastTopologyChangeVersion()))
-                    tryEvict(parts.get());
-
-                return;
-            }
-        }
-    }
-
-    /**
-     * Unregisters from all the partitions and unpublishes this reservation.
-     */
-    private void unregister() {
-        GridDhtLocalPartition[] arr = parts.get();
-
-        // Unregister from partitions.
-        if (!F.isEmpty(arr) && parts.compareAndSet(arr, EMPTY)) {
-            // Reverse order makes sure that addReservation on the same topVer
-            // reservation will fail on the first partition.
-            for (int i = arr.length - 1; i >= 0; i--) {
-                GridDhtLocalPartition part = arr[i];
-
-                part.removeReservation(this);
-            }
-        }
-
-        // Unpublish.
-        CI1<GridDhtPartitionsReservation> u = unpublish.get();
-
-        if (u != null && u != NO_OP && unpublish.compareAndSet(u, NO_OP))
-            u.apply(this);
-    }
-
-    /**
-     * Must be checked in {@link 
GridDhtLocalPartition#tryClear(EvictionContext)}.
-     * If returns {@code true} this reservation object becomes invalid and 
partitions
-     * can be evicted or at least cleared.
-     * Also this means that after returning {@code true} here method {@link 
#reserve()} can not
-     * return {@code true} anymore.
-     *
-     * @return {@code true} If this reservation was successfully invalidated 
because it was not
-     *          reserved and partitions can be evicted.
-     */
-    public boolean invalidate() {
-        assert parts.get() != null : "all parts must be reserved before 
registration";
-
-        int r = reservations.get();
-
-        assert r >= -1 : r;
-
-        if (r != 0)
-            return r == -1;
-
-        if (reservations.compareAndSet(0, -1)) {
-            unregister();
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o;
-
-        return cctx == that.cctx && topVer.equals(that.topVer) && 
appKey.equals(that.appKey);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        String name = cctx.name();
-
-        int result = name == null ? 0 : name.hashCode();
-
-        result = 31 * result + appKey.hashCode();
-        result = 31 * result + topVer.hashCode();
-
-        return result;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
deleted file mode 100644
index 3b99758..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-
-/**
- * Class to validate partitions update counters and cache sizes during 
exchange process.
- */
-public class GridDhtPartitionsStateValidator {
-    /** Version since node is able to send cache sizes in {@link 
GridDhtPartitionsSingleMessage}. */
-    private static final IgniteProductVersion SIZES_VALIDATION_AVAILABLE_SINCE 
= IgniteProductVersion.fromString("2.5.0");
-
-    /** Cache shared context. */
-    private final GridCacheSharedContext<?, ?> cctx;
-
-    /**
-     * Constructor.
-     *
-     * @param cctx Cache shared context.
-     */
-    public GridDhtPartitionsStateValidator(GridCacheSharedContext<?, ?> cctx) {
-        this.cctx = cctx;
-    }
-
-    /**
-     * Validates partition states - update counters and cache sizes for all 
nodes.
-     * If update counter value or cache size for the same partitions are 
different on some nodes
-     * method throws exception with full information about inconsistent 
partitions.
-     *
-     * @param fut Current exchange future.
-     * @param top Topology to validate.
-     * @param messages Single messages received from all nodes.
-     * @throws IgniteCheckedException If validation failed. Exception message 
contains
-     * full information about all partitions which update counters or cache 
sizes are not consistent.
-     */
-    public void 
validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut,
-                                                  GridDhtPartitionTopology top,
-                                                  Map<UUID, 
GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException {
-        final Set<UUID> ignoringNodes = new HashSet<>();
-
-        // Ignore just joined nodes.
-        for (DiscoveryEvent evt : fut.events().events())
-            if (evt.type() == EVT_NODE_JOINED)
-                ignoringNodes.add(evt.eventNode().id());
-
-        AffinityTopologyVersion topVer = 
fut.context().events().topologyVersion();
-
-        // Validate update counters.
-        Map<Integer, Map<UUID, Long>> result = 
validatePartitionsUpdateCounters(top, messages, ignoringNodes);
-        if (!result.isEmpty())
-            throw new IgniteCheckedException("Partitions update counters are 
inconsistent for " + fold(topVer, result));
-
-        // For sizes validation ignore also nodes which are not able to send 
cache sizes.
-        for (UUID id : messages.keySet()) {
-            ClusterNode node = cctx.discovery().node(id);
-            if (node != null && 
node.version().compareTo(SIZES_VALIDATION_AVAILABLE_SINCE) < 0)
-                ignoringNodes.add(id);
-        }
-
-        if (!MvccUtils.mvccEnabled(cctx.kernalContext())) { // TODO: Remove 
"if" clause in IGNITE-9451.
-            // Validate cache sizes.
-            result = validatePartitionsSizes(top, messages, ignoringNodes);
-
-            if (!result.isEmpty())
-                throw new IgniteCheckedException("Partitions cache sizes are 
inconsistent for " + fold(topVer, result));
-        }
-    }
-
-    /**
-     * Checks what partitions from given {@code singleMsg} message should be 
excluded from validation.
-     *
-     * @param top Topology to validate.
-     * @param nodeId Node which sent single message.
-     * @param singleMsg Single message.
-     * @return Set of partition ids should be excluded from validation.
-     */
-    @Nullable private Set<Integer> shouldIgnore(GridDhtPartitionTopology top, 
UUID nodeId, GridDhtPartitionsSingleMessage singleMsg) {
-        CachePartitionPartialCountersMap countersMap = 
singleMsg.partitionUpdateCounters(top.groupId(), top.partitions());
-        Map<Integer, Long> sizesMap = singleMsg.partitionSizes(top.groupId());
-
-        Set<Integer> ignore = null;
-
-        for (int p = 0; p < top.partitions(); p++) {
-            if (top.partitionState(nodeId, p) != GridDhtPartitionState.OWNING) 
{
-                if (ignore == null)
-                    ignore = new HashSet<>();
-
-                ignore.add(p);
-
-                continue;
-            }
-
-            int partIdx = countersMap.partitionIndex(p);
-            long updateCounter = partIdx >= 0 ? 
countersMap.updateCounterAt(partIdx) : 0;
-            long size = sizesMap.containsKey(p) ? sizesMap.get(p) : 0;
-
-            // Do not validate partitions with zero update counter and size.
-            if (updateCounter == 0 && size == 0) {
-                if (ignore == null)
-                    ignore = new HashSet<>();
-
-                ignore.add(p);
-            }
-        }
-
-        return ignore;
-    }
-
-    /**
-     * Validate partitions update counters for given {@code top}.
-     *
-     * @param top Topology to validate.
-     * @param messages Single messages received from all nodes.
-     * @param ignoringNodes Nodes for what we ignore validation.
-     * @return Invalid partitions map with following structure: (partId, 
(nodeId, updateCounter)).
-     * If map is empty validation is successful.
-     */
-     Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters(
-            GridDhtPartitionTopology top,
-            Map<UUID, GridDhtPartitionsSingleMessage> messages,
-            Set<UUID> ignoringNodes) {
-        Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
-
-        Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new 
HashMap<>();
-
-        // Populate counters statistics from local node partitions.
-        for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
-            if (part.state() != GridDhtPartitionState.OWNING)
-                continue;
-
-            if (part.updateCounter() == 0 && part.fullSize() == 0)
-                continue;
-
-            updateCountersAndNodesByPartitions.put(part.id(), new 
T2<>(cctx.localNodeId(), part.updateCounter()));
-        }
-
-        int partitions = top.partitions();
-
-        // Then process and validate counters from other nodes.
-        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
messages.entrySet()) {
-            UUID nodeId = e.getKey();
-            if (ignoringNodes.contains(nodeId))
-                continue;
-
-            CachePartitionPartialCountersMap countersMap = 
e.getValue().partitionUpdateCounters(top.groupId(), partitions);
-
-            Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, 
e.getValue());
-
-            for (int part = 0; part < partitions; part++) {
-                if (ignorePartitions != null && 
ignorePartitions.contains(part))
-                    continue;
-
-                int partIdx = countersMap.partitionIndex(part);
-                long currentCounter = partIdx >= 0 ? 
countersMap.updateCounterAt(partIdx) : 0;
-
-                process(invalidPartitions, updateCountersAndNodesByPartitions, 
part, nodeId, currentCounter);
-            }
-        }
-
-        return invalidPartitions;
-    }
-
-    /**
-     * Validate partitions cache sizes for given {@code top}.
-     *
-     * @param top Topology to validate.
-     * @param messages Single messages received from all nodes.
-     * @param ignoringNodes Nodes for what we ignore validation.
-     * @return Invalid partitions map with following structure: (partId, 
(nodeId, cacheSize)).
-     * If map is empty validation is successful.
-     */
-     Map<Integer, Map<UUID, Long>> validatePartitionsSizes(
-            GridDhtPartitionTopology top,
-            Map<UUID, GridDhtPartitionsSingleMessage> messages,
-            Set<UUID> ignoringNodes) {
-        Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>();
-
-        Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new 
HashMap<>();
-
-        // Populate sizes statistics from local node partitions.
-        for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
-            if (part.state() != GridDhtPartitionState.OWNING)
-                continue;
-
-            if (part.updateCounter() == 0 && part.fullSize() == 0)
-                continue;
-
-            sizesAndNodesByPartitions.put(part.id(), new 
T2<>(cctx.localNodeId(), part.fullSize()));
-        }
-
-        int partitions = top.partitions();
-
-        // Then process and validate sizes from other nodes.
-        for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
messages.entrySet()) {
-            UUID nodeId = e.getKey();
-            if (ignoringNodes.contains(nodeId))
-                continue;
-
-            Map<Integer, Long> sizesMap = 
e.getValue().partitionSizes(top.groupId());
-
-            Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, 
e.getValue());
-
-            for (int part = 0; part < partitions; part++) {
-                if (ignorePartitions != null && 
ignorePartitions.contains(part))
-                    continue;
-
-                long currentSize = sizesMap.containsKey(part) ? 
sizesMap.get(part) : 0L;
-
-                process(invalidPartitions, sizesAndNodesByPartitions, part, 
nodeId, currentSize);
-            }
-        }
-
-        return invalidPartitions;
-    }
-
-    /**
-     * Processes given {@code counter} for partition {@code part} reported by 
{@code node}.
-     * Populates {@code invalidPartitions} map if existing counter and current 
{@code counter} are different.
-     *
-     * @param invalidPartitions Invalid partitions map.
-     * @param countersAndNodes Current map of counters and nodes by partitions.
-     * @param part Processing partition.
-     * @param node Node id.
-     * @param counter Counter value reported by {@code node}.
-     */
-    private void process(Map<Integer, Map<UUID, Long>> invalidPartitions,
-                         Map<Integer, T2<UUID, Long>> countersAndNodes,
-                         int part,
-                         UUID node,
-                         long counter) {
-        T2<UUID, Long> existingData = countersAndNodes.get(part);
-
-        if (existingData == null)
-            countersAndNodes.put(part, new T2<>(node, counter));
-
-        if (existingData != null && counter != existingData.get2()) {
-            if (!invalidPartitions.containsKey(part)) {
-                Map<UUID, Long> map = new HashMap<>();
-                map.put(existingData.get1(), existingData.get2());
-                invalidPartitions.put(part, map);
-            }
-
-            invalidPartitions.get(part).put(node, counter);
-        }
-    }
-
-    /**
-     * Folds given map of invalid partition states to string representation in 
the following format:
-     * Part [id]: [consistentId=value*]
-     *
-     * Value can be both update counter or cache size.
-     *
-     * @param topVer Last topology version.
-     * @param invalidPartitions Invalid partitions map.
-     * @return String representation of invalid partitions.
-     */
-    private String fold(AffinityTopologyVersion topVer, Map<Integer, Map<UUID, 
Long>> invalidPartitions) {
-        SB sb = new SB();
-
-        NavigableMap<Integer, Map<UUID, Long>> sortedPartitions = new 
TreeMap<>(invalidPartitions);
-
-        for (Map.Entry<Integer, Map<UUID, Long>> p : 
sortedPartitions.entrySet()) {
-            sb.a("Part ").a(p.getKey()).a(": [");
-            for (Map.Entry<UUID, Long> e : p.getValue().entrySet()) {
-                Object consistentId = cctx.discovery().node(topVer, 
e.getKey()).consistentId();
-                sb.a(consistentId).a("=").a(e.getValue()).a(" ");
-            }
-            sb.a("] ");
-        }
-
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index f6df80b..4480dae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -52,6 +52,9 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLo
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 2b34a41..ad164e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -46,6 +46,9 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index a0c9d15..ffa383b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 609bc4a..6662a1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -38,6 +38,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index a76844a..39e0774 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -41,10 +41,11 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import 
org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotResponseListener;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index fd9bc77..f5689f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import 
org.apache.ignite.internal.processors.cache.GridCacheUtils.BackupPostProcessingClosure;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
@@ -60,7 +61,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java
deleted file mode 100644
index 780ca91..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionsEvictManager.java
+++ /dev/null
@@ -1,569 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVICTION_PERMITS;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
-import static org.apache.ignite.IgniteSystemProperties.getLong;
-
-/**
- * Class that serves asynchronous part eviction process.
- * Multiple partition from group can be evicted at the same time.
- */
-public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
-
-    /** Default eviction progress show frequency. */
-    private static final int DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 
1000; // 2 Minutes.
-
-    /** Eviction progress frequency property name. */
-    private static final String SHOW_EVICTION_PROGRESS_FREQ = 
"SHOW_EVICTION_PROGRESS_FREQ";
-
-    /** Eviction thread pool policy. */
-    private static final byte EVICT_POOL_PLC = GridIoPolicy.SYSTEM_POOL;
-
-    /** Eviction progress frequency in ms. */
-    private final long evictionProgressFreqMs = 
getLong(SHOW_EVICTION_PROGRESS_FREQ, DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS);
-
-    /** */
-    private final int confPermits = getInteger(IGNITE_EVICTION_PERMITS, -1);
-
-    /** Next time of show eviction progress. */
-    private long nextShowProgressTime;
-
-    private final Map<Integer, GroupEvictionContext> evictionGroupsMap = new 
ConcurrentHashMap<>();
-
-    /** Flag indicates that eviction process has stopped. */
-    private volatile boolean stop;
-
-    /** Check stop eviction context. */
-    private final EvictionContext sharedEvictionContext = () -> stop;
-
-    /** Number of maximum concurrent operations. */
-    private volatile int threads;
-
-    /** How many eviction task may execute concurrent. */
-    private volatile int permits;
-
-    /** Bucket queue for load balance partitions to the threads via count of 
partition size.
-     *  Is not thread-safe.
-     *  All method should be called under mux synchronization.
-     */
-    private volatile BucketQueue evictionQueue;
-
-    /** Lock object. */
-    private final Object mux = new Object();
-
-    /**
-     * Stops eviction process for group.
-     *
-     * Method awaits last offered partition eviction.
-     *
-     * @param grp Group context.
-     */
-    public void onCacheGroupStopped(CacheGroupContext  grp){
-        GroupEvictionContext groupEvictionContext = 
evictionGroupsMap.remove(grp.groupId());
-
-        if (groupEvictionContext != null){
-            groupEvictionContext.stop();
-
-            groupEvictionContext.awaitFinishAll();
-        }
-    }
-
-    /**
-     * Adds partition to eviction queue and starts eviction process if permit 
available.
-     *
-     * @param grp Group context.
-     * @param part Partition to evict.
-     */
-    public void evictPartitionAsync(CacheGroupContext grp, 
GridDhtLocalPartition part) {
-        // Check node stop.
-        if (sharedEvictionContext.shouldStop())
-            return;
-
-        GroupEvictionContext groupEvictionContext = 
evictionGroupsMap.computeIfAbsent(
-            grp.groupId(), (k) -> new GroupEvictionContext(grp));
-
-        PartitionEvictionTask evictionTask = 
groupEvictionContext.createEvictPartitionTask(part);
-
-        if (evictionTask == null)
-            return;
-
-        int bucket;
-
-        synchronized (mux) {
-            bucket = evictionQueue.offer(evictionTask);
-        }
-
-        scheduleNextPartitionEviction(bucket);
-    }
-
-    /**
-     * Gets next partition from the queue and schedules it for eviction.
-     *
-     * @param bucket Bucket.
-     */
-    private void scheduleNextPartitionEviction(int bucket) {
-        // Check node stop.
-        if (sharedEvictionContext.shouldStop())
-            return;
-
-        synchronized (mux) {
-            // Check that we have permits for next operation.
-            if (permits > 0) {
-                // If queue is empty not need to do.
-                if (evictionQueue.isEmpty())
-                    return;
-
-                // Get task until we have permits.
-                while (permits >= 0) {
-                    // Get task from bucket.
-                    PartitionEvictionTask evictionTask = 
evictionQueue.poll(bucket);
-
-                    // If bucket empty try get from another.
-                    if (evictionTask == null) {
-                        // Until queue have tasks.
-                        while (!evictionQueue.isEmpty()) {
-                            // Get task from any other bucket.
-                            evictionTask = evictionQueue.pollAny();
-
-                            // Stop iteration if we found task.
-                            if (evictionTask != null)
-                                break;
-                        }
-
-                        // If task not found no need to do some.
-                        if (evictionTask == null)
-                            return;
-                    }
-
-                    // Print current eviction progress.
-                    showProgress();
-
-                    GroupEvictionContext groupEvictionContext = 
evictionTask.groupEvictionContext;
-
-                    // Check that group or node stopping.
-                    if (groupEvictionContext.shouldStop())
-                        continue;
-
-                    // Get permit for this task.
-                    permits--;
-
-                    // Register task future, may need if group or node will be 
stopped.
-                    groupEvictionContext.taskScheduled(evictionTask);
-
-                    evictionTask.finishFut.listen(f -> {
-                        synchronized (mux) {
-                            // Return permit after task completed.
-                            permits++;
-                        }
-
-                        // Re-schedule new one task form same bucket.
-                        scheduleNextPartitionEviction(bucket);
-                    });
-
-                    // Submit task to executor.
-                     cctx.kernalContext()
-                        .closure()
-                        .runLocalSafe(evictionTask, EVICT_POOL_PLC);
-                }
-            }
-        }
-    }
-
-    /**
-     * Shows progress of eviction.
-     */
-    private void showProgress() {
-        if (U.currentTimeMillis() >= nextShowProgressTime) {
-            int size = evictionQueue.size() + 1; // Queue size plus current 
partition.
-
-            if (log.isInfoEnabled())
-                log.info("Eviction in progress [permits=" + permits+
-                    ", threads=" + threads +
-                    ", groups=" + evictionGroupsMap.keySet().size() +
-                    ", remainingPartsToEvict=" + size + "]");
-
-            
evictionGroupsMap.values().forEach(GroupEvictionContext::showProgress);
-
-            nextShowProgressTime = U.currentTimeMillis() + 
evictionProgressFreqMs;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        super.start0();
-
-        // If property is not setup, calculate permits as parts of sys pool.
-        if (confPermits == -1) {
-            int sysPoolSize = 
cctx.kernalContext().config().getSystemThreadPoolSize();
-
-            threads = permits = sysPoolSize / 4;
-        }
-        else
-            threads = permits = confPermits;
-
-        // Avoid 0 permits if sys pool size less that 4.
-        if (threads == 0)
-            threads = permits = 1;
-
-        log.info("Evict partition permits=" + permits);
-
-        evictionQueue = new BucketQueue(threads);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0(boolean cancel) {
-        super.stop0(cancel);
-
-        stop = true;
-
-        Collection<GroupEvictionContext> evictionGrps = 
evictionGroupsMap.values();
-
-        evictionGrps.forEach(GroupEvictionContext::stop);
-
-        evictionGrps.forEach(GroupEvictionContext::awaitFinishAll);
-    }
-
-    /**
-     *
-     */
-    private class GroupEvictionContext implements EvictionContext {
-        /** */
-        private final CacheGroupContext grp;
-
-        /** Deduplicate set partition ids. */
-        private final Set<Integer> partIds = new GridConcurrentHashSet<>();
-
-        /** Future for currently running partition eviction task. */
-        private final Map<Integer, IgniteInternalFuture<?>> partsEvictFutures 
= new ConcurrentHashMap<>();
-
-        /** Flag indicates that eviction process has stopped for this group. */
-        private volatile boolean stop;
-
-        /** Total partition to evict. */
-        private AtomicInteger totalTasks = new AtomicInteger();
-
-        /** Total partition evict in progress. */
-        private int taskInProgress;
-
-        /**
-         * @param grp Group context.
-         */
-        private GroupEvictionContext(CacheGroupContext grp) {
-            this.grp = grp;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean shouldStop() {
-            return stop || sharedEvictionContext.shouldStop();
-        }
-
-        /**
-         *
-         * @param part Grid local partition.
-         */
-        private PartitionEvictionTask 
createEvictPartitionTask(GridDhtLocalPartition part){
-            if (shouldStop() || !partIds.add(part.id()))
-                return null;
-
-            totalTasks.incrementAndGet();
-
-            return new PartitionEvictionTask(part, this);
-        }
-
-        /**
-         *
-         * @param task Partition eviction task.
-         */
-        private synchronized void taskScheduled(PartitionEvictionTask task) {
-            if (shouldStop())
-                return;
-
-            taskInProgress++;
-
-            GridFutureAdapter<?> fut = task.finishFut;
-
-            int partId = task.part.id();
-
-            partsEvictFutures.put(partId, fut);
-
-            fut.listen(f -> {
-                synchronized (this) {
-                    taskInProgress--;
-
-                    partsEvictFutures.remove(partId, f);
-
-                    if (totalTasks.decrementAndGet() == 0)
-                        evictionGroupsMap.remove(grp.groupId());
-                }
-            });
-        }
-
-        /**
-         * Stop eviction for group.
-         */
-        private void stop() {
-            stop = true;
-        }
-
-        /**
-         * Await evict finish.
-         */
-        private void awaitFinishAll(){
-            partsEvictFutures.forEach(this::awaitFinish);
-
-            evictionGroupsMap.remove(grp.groupId());
-        }
-
-        /**
-         * Await evict finish partition.
-         */
-        private void awaitFinish(Integer part, IgniteInternalFuture<?> fut) {
-            // Wait for last offered partition eviction completion
-            try {
-                log.info("Await partition evict, grpName=" + 
grp.cacheOrGroupName() +
-                    ", grpId=" + grp.groupId() + ", partId=" + part);
-
-                fut.get();
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.warning("Failed to await partition eviction during 
stopping.", e);
-            }
-        }
-
-        /**
-         * Shows progress group of eviction.
-         */
-        private void showProgress() {
-            if (log.isInfoEnabled())
-                log.info("Group eviction in progress [grpName=" + 
grp.cacheOrGroupName()+
-                    ", grpId=" + grp.groupId() +
-                    ", remainingPartsToEvict=" + (totalTasks.get() - 
taskInProgress) +
-                    ", partsEvictInProgress=" + taskInProgress +
-                    ", totalParts= " + grp.topology().localPartitions().size() 
+ "]");
-        }
-    }
-
-    /**
-     * Task for self-scheduled partition eviction / clearing.
-     */
-    private class PartitionEvictionTask implements Runnable {
-        /** Partition to evict. */
-        private final GridDhtLocalPartition part;
-
-        private final long size;
-
-        /** Eviction context. */
-        private final GroupEvictionContext groupEvictionContext;
-
-        /** */
-        private final GridFutureAdapter<?> finishFut = new 
GridFutureAdapter<>();
-
-        /**
-         * @param part Partition.
-         */
-        private PartitionEvictionTask(
-            GridDhtLocalPartition part,
-            GroupEvictionContext groupEvictionContext
-        ) {
-            this.part = part;
-            this.groupEvictionContext = groupEvictionContext;
-
-            size = part.fullSize();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            if (groupEvictionContext.shouldStop()) {
-                finishFut.onDone();
-
-                return;
-            }
-
-            try {
-                boolean success = part.tryClear(groupEvictionContext);
-
-                if (success) {
-                    if (part.state() == GridDhtPartitionState.EVICTED && 
part.markForDestroy())
-                        part.destroy();
-                }
-                else // Re-offer partition if clear was unsuccessful due to 
partition reservation.
-                    evictionQueue.offer(this);
-
-                // Complete eviction future before schedule new to prevent 
deadlock with
-                // simultaneous eviction stopping and scheduling new eviction.
-                finishFut.onDone();
-            }
-            catch (Throwable ex) {
-                finishFut.onDone(ex);
-
-                if (cctx.kernalContext().isStopping()) {
-                    LT.warn(log, ex, "Partition eviction failed (current node 
is stopping).",
-                        false,
-                        true);
-                }
-                else{
-                    LT.error(log, ex, "Partition eviction failed, this can 
cause grid hang.");
-                }
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private class BucketQueue {
-        /** Queues contains partitions scheduled for eviction. */
-        private final Queue<PartitionEvictionTask>[] buckets;
-
-        /** */
-        private final long[] bucketSizes;
-
-        /**
-         * @param buckets Number of buckets.
-         */
-        BucketQueue(int buckets) {
-            this.buckets = new Queue[buckets];
-
-            for (int i = 0; i < buckets; i++)
-                this.buckets[i] = createEvictPartitionQueue();
-
-            bucketSizes = new long[buckets];
-        }
-
-        /**
-         * Poll eviction task from queue for specific bucket.
-         *
-         * @param bucket Bucket index.
-         * @return Partition evict task, or {@code null} if bucket queue is 
empty.
-         */
-        PartitionEvictionTask poll(int bucket) {
-            PartitionEvictionTask task = buckets[bucket].poll();
-
-            if (task != null)
-                bucketSizes[bucket] -= task.size;
-
-            return task;
-        }
-
-        /**
-         * Poll eviction task from queue (bucket is not specific).
-         *
-         * @return Partition evict task.
-         */
-        PartitionEvictionTask pollAny() {
-            for (int bucket = 0; bucket < bucketSizes.length; bucket++){
-                if (!buckets[bucket].isEmpty())
-                    return poll(bucket);
-            }
-
-            return null;
-        }
-
-        /**
-         * Offer task to queue.
-         *
-         * @return Bucket index.
-         */
-        int offer(PartitionEvictionTask task) {
-            int bucket = calculateBucket();
-
-            buckets[bucket].offer(task);
-
-            bucketSizes[bucket] += task.size;
-
-            return bucket;
-        }
-
-
-        /**
-         * @return {@code True} if queue is empty, {@code} False if not empty.
-         */
-        boolean isEmpty(){
-            return size() == 0;
-        }
-
-        /**
-         * @return Queue size.
-         */
-        int size(){
-            int size = 0;
-
-            for (Queue<PartitionEvictionTask> queue : buckets) {
-                size += queue.size();
-            }
-
-            return size;
-        }
-
-        /***
-         * @return Bucket index.
-         */
-        private int calculateBucket() {
-            int min = 0;
-
-            for (int bucket = min; bucket < bucketSizes.length; bucket++) {
-                if (bucketSizes[min] > bucketSizes[bucket])
-                    min = bucket;
-            }
-
-            return min;
-        }
-
-        /**
-         * 0 - PRIORITY QUEUE (compare by partition size).
-         * default (any other values) - FIFO.
-         */
-        private static final byte QUEUE_TYPE = 1;
-
-        /**
-         *
-         * @return Queue for evict partitions.
-         */
-        private Queue<PartitionEvictionTask> createEvictPartitionQueue() {
-            switch (QUEUE_TYPE) {
-                case 1:
-                    return new PriorityBlockingQueue<>(
-                        1000, Comparator.comparingLong(p -> 
p.part.fullSize()));
-                default:
-                    return new LinkedBlockingQueue<>();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4c42315..8edefa2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -68,8 +68,8 @@ import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f6de594..4d5fa13 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -50,7 +50,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedUn
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtEmbeddedFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFinishedFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 3b03958..b37acf3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -40,8 +40,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
@@ -61,8 +61,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 29573cb..40defa1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -49,9 +49,9 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
@@ -78,7 +78,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOAD
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
 
@@ -438,8 +438,8 @@ public class GridDhtPartitionDemander {
         }
 
         if (!ctx.kernalContext().grid().isRebalanceEnabled()) {
-            if (log.isDebugEnabled())
-                log.debug("Cancel partition demand because rebalance disabled 
on current node.");
+            if (log.isTraceEnabled())
+                log.trace("Cancel partition demand because rebalance disabled 
on current node.");
 
             fut.cancel();
 
@@ -455,6 +455,8 @@ public class GridDhtPartitionDemander {
 
         final CacheConfiguration cfg = grp.config();
 
+        int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize();
+
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assignments.entrySet()) {
             final ClusterNode node = e.getKey();
 
@@ -467,13 +469,11 @@ public class GridDhtPartitionDemander {
 
                 parts = fut.remaining.get(node.id()).get2();
 
-                U.log(log, "Starting rebalancing [grp=" + 
grp.cacheOrGroupName()
-                        + ", mode=" + cfg.getRebalanceMode() + ", fromNode=" + 
node.id() + ", partitionsCount=" + parts.size()
-                        + ", topology=" + fut.topologyVersion() + ", 
rebalanceId=" + fut.rebalanceId + "]");
+                U.log(log, "Prepared rebalancing [grp=" + 
grp.cacheOrGroupName()
+                        + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + 
node.id() + ", partitionsCount=" + parts.size()
+                        + ", topVer=" + fut.topologyVersion() + ", 
parallelism=" + totalStripes + "]");
             }
 
-            int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize();
-
             int stripes = totalStripes;
 
             final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new 
ArrayList<>(stripes);
@@ -521,10 +521,11 @@ public class GridDhtPartitionDemander {
                                     fut.cleanupRemoteContexts(node.id());
                             }
 
-                            if (log.isDebugEnabled())
-                                log.debug("Requested rebalancing [from node=" 
+ node.id() + ", listener index=" +
-                                    topicId + " " + demandMsg.rebalanceId() + 
", partitions count=" + stripePartitions.get(topicId).size() +
-                                    " (" + 
stripePartitions.get(topicId).partitionsList() + ")]");
+                            if (log.isInfoEnabled())
+                                log.info("Started rebalance routine [" + 
grp.cacheOrGroupName() +
+                                    ", supplier=" + node.id() + ", topic=" + 
topicId +
+                                    ", fullPartitions=" + 
S.compact(stripePartitions.get(topicId).fullSet()) +
+                                    ", histPartitions=" + 
S.compact(stripePartitions.get(topicId).historicalSet()) + "]");
                         }
                         catch (IgniteCheckedException e1) {
                             ClusterTopologyCheckedException cause = 
e1.getCause(ClusterTopologyCheckedException.class);
@@ -613,16 +614,15 @@ public class GridDhtPartitionDemander {
                             }
 
                             if (log.isDebugEnabled())
-                                log.debug("Remaining clearing partitions 
[grp=" + grp.cacheOrGroupName()
-                                    + ", remaining=" + remaining + "]");
+                                log.debug("Partition is ready for rebalance 
[grp=" + grp.cacheOrGroupName()
+                                    + ", p=" + part.id() + ", remaining=" + 
remaining + "]");
 
                             if (remaining == 0)
                                 clearAllFuture.onDone();
                         }
                     }
-                    else {
+                    else
                         clearAllFuture.onDone();
-                    }
                 });
             }
             else {
@@ -636,10 +636,6 @@ public class GridDhtPartitionDemander {
                     }
                 }
 
-                if (log.isDebugEnabled())
-                    log.debug("Remaining clearing partitions [grp=" + 
grp.cacheOrGroupName()
-                        + ", remaining=" + remaining + "]");
-
                 if (remaining == 0)
                     clearAllFuture.onDone();
             }
@@ -660,38 +656,41 @@ public class GridDhtPartitionDemander {
      *
      * @param topicId Topic id.
      * @param nodeId Node id.
-     * @param supply Supply message.
+     * @param supplyMsg Supply message.
      */
     public void handleSupplyMessage(
         int topicId,
         final UUID nodeId,
-        final GridDhtPartitionSupplyMessage supply
+        final GridDhtPartitionSupplyMessage supplyMsg
     ) {
-        AffinityTopologyVersion topVer = supply.topologyVersion();
+        AffinityTopologyVersion topVer = supplyMsg.topologyVersion();
 
         final RebalanceFuture fut = rebalanceFut;
 
         ClusterNode node = ctx.node(nodeId);
 
-        if (node == null)
-            return;
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Supply message ignored (supplier has left cluster) 
[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
 
-        if (topologyChanged(fut)) // Topology already changed (for the future 
that supply message based on).
             return;
+        }
+
+        // Topology already changed (for the future that supply message based 
on).
+        if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) {
+            if (log.isDebugEnabled())
+                log.debug("Supply message ignored (topology changed) [" + 
demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
 
-        if (!fut.isActual(supply.rebalanceId())) {
-            // Current future have another rebalance id.
-            // Supple message based on another future.
             return;
         }
 
         if (log.isDebugEnabled())
-            log.debug("Received supply message [grp=" + grp.cacheOrGroupName() 
+ ", msg=" + supply + ']');
+            log.debug("Received supply message [" + demandRoutineInfo(topicId, 
nodeId, supplyMsg) + "]");
 
         // Check whether there were error during supply message unmarshalling 
process.
-        if (supply.classError() != null) {
-            U.warn(log, "Rebalancing from node cancelled [grp=" + 
grp.cacheOrGroupName() + ", node=" + nodeId +
-                "]. Supply message couldn't be unmarshalled: " + 
supply.classError());
+        if (supplyMsg.classError() != null) {
+            U.warn(log, "Rebalancing from node cancelled [" + 
demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
+                ". Supply message couldn't be unmarshalled: " + 
supplyMsg.classError());
 
             fut.cancel(nodeId);
 
@@ -699,9 +698,9 @@ public class GridDhtPartitionDemander {
         }
 
         // Check whether there were error during supplying process.
-        if (supply.error() != null) {
-            U.warn(log, "Rebalancing from node cancelled [grp=" + 
grp.cacheOrGroupName() + ", node=" + nodeId +
-                "]. Supplier has failed with error: " + supply.error());
+        if (supplyMsg.error() != null) {
+            U.warn(log, "Rebalancing from node cancelled [" + 
demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
+                "]. Supplier has failed with error: " + supplyMsg.error());
 
             fut.cancel(nodeId);
 
@@ -713,13 +712,13 @@ public class GridDhtPartitionDemander {
         if (grp.sharedGroup()) {
             for (GridCacheContext cctx : grp.caches()) {
                 if (cctx.statisticsEnabled()) {
-                    long keysCnt = supply.keysForCache(cctx.cacheId());
+                    long keysCnt = supplyMsg.keysForCache(cctx.cacheId());
 
                     if (keysCnt != -1)
                         
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
 
                     // Can not be calculated per cache.
-                    
cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+                    
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
                 }
             }
         }
@@ -727,10 +726,10 @@ public class GridDhtPartitionDemander {
             GridCacheContext cctx = grp.singleCacheContext();
 
             if (cctx.statisticsEnabled()) {
-                if (supply.estimatedKeysCount() != -1)
-                    
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount());
+                if (supplyMsg.estimatedKeysCount() != -1)
+                    
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount());
 
-                
cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+                
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
             }
         }
 
@@ -738,7 +737,7 @@ public class GridDhtPartitionDemander {
             AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
             // Preload.
-            for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supplyMsg.infos().entrySet()) {
                 int p = e.getKey();
 
                 if (aff.get(p).contains(ctx.localNode())) {
@@ -746,7 +745,7 @@ public class GridDhtPartitionDemander {
 
                     assert part != null;
 
-                    boolean last = supply.last().containsKey(p);
+                    boolean last = supplyMsg.last().containsKey(p);
 
                     if (part.state() == MOVING) {
                         boolean reserved = part.reserve();
@@ -795,7 +794,8 @@ public class GridDhtPartitionDemander {
                                 fut.partitionDone(nodeId, p, true);
 
                                 if (log.isDebugEnabled())
-                                    log.debug("Finished rebalancing partition: 
" + part);
+                                    log.debug("Finished rebalancing partition: 
" +
+                                        "[" + demandRoutineInfo(topicId, 
nodeId, supplyMsg) + ", p=" + p + "]");
                             }
                         }
                         finally {
@@ -808,29 +808,31 @@ public class GridDhtPartitionDemander {
                             fut.partitionDone(nodeId, p, false);
 
                         if (log.isDebugEnabled())
-                            log.debug("Skipping rebalancing partition (state 
is not MOVING): " + part);
+                            log.debug("Skipping rebalancing partition (state 
is not MOVING): " +
+                                "[" + demandRoutineInfo(topicId, nodeId, 
supplyMsg) + ", p=" + p + "]");
                     }
                 }
                 else {
                     fut.partitionDone(nodeId, p, false);
 
                     if (log.isDebugEnabled())
-                        log.debug("Skipping rebalancing partition (it does not 
belong on current node): " + p);
+                        log.debug("Skipping rebalancing partition (affinity 
changed): " +
+                            "[" + demandRoutineInfo(topicId, nodeId, 
supplyMsg) + ", p=" + p + "]");
                 }
             }
 
             // Only request partitions based on latest topology version.
-            for (Integer miss : supply.missed()) {
+            for (Integer miss : supplyMsg.missed()) {
                 if (aff.get(miss).contains(ctx.localNode()))
                     fut.partitionMissed(nodeId, miss);
             }
 
-            for (Integer miss : supply.missed())
+            for (Integer miss : supplyMsg.missed())
                 fut.partitionDone(nodeId, miss, false);
 
             GridDhtPartitionDemandMessage d = new 
GridDhtPartitionDemandMessage(
-                supply.rebalanceId(),
-                supply.topologyVersion(),
+                supplyMsg.rebalanceId(),
+                supplyMsg.topologyVersion(),
                 grp.groupId());
 
             d.timeout(grp.config().getRebalanceTimeout());
@@ -842,18 +844,24 @@ public class GridDhtPartitionDemander {
                 try {
                     ctx.io().sendOrderedMessage(node, 
rebalanceTopics.get(topicId),
                         d.convertIfNeeded(node.version()), grp.ioPolicy(), 
grp.config().getRebalanceTimeout());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Send next demand message [" + 
demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
                 }
                 catch (ClusterTopologyCheckedException e) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Node left during rebalancing [grp=" + 
grp.cacheOrGroupName() +
-                            ", node=" + node.id() + ", msg=" + e.getMessage() 
+ ']');
-                    }
+                    if (log.isDebugEnabled())
+                        log.debug("Supplier has left [" + 
demandRoutineInfo(topicId, nodeId, supplyMsg) +
+                            ", errMsg=" + e.getMessage() + ']');
                 }
             }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Will not request next demand message [" + 
demandRoutineInfo(topicId, nodeId, supplyMsg) +
+                        ", topChanged=" + topologyChanged(fut) + ", 
rebalanceFuture=" + fut + "]");
+            }
         }
         catch (IgniteSpiException | IgniteCheckedException e) {
-            LT.error(log, e, "Error during rebalancing [grp=" + 
grp.cacheOrGroupName() +
-                ", srcNode=" + node.id() +
+            LT.error(log, e, "Error during rebalancing [" + 
demandRoutineInfo(topicId, nodeId, supplyMsg) +
                 ", err=" + e + ']');
         }
     }
@@ -946,6 +954,17 @@ public class GridDhtPartitionDemander {
         return true;
     }
 
+    /**
+     * String representation of demand routine.
+     *
+     * @param topicId Topic id.
+     * @param supplier Supplier.
+     * @param supplyMsg Supply message.
+     */
+    private String demandRoutineInfo(int topicId, UUID supplier, 
GridDhtPartitionSupplyMessage supplyMsg) {
+        return "grp=" + grp.cacheOrGroupName() + ", topVer=" + 
supplyMsg.topologyVersion() + ", supplier=" + supplier + ", topic=" + topicId;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtPartitionDemander.class, this);
@@ -980,6 +999,9 @@ public class GridDhtPartitionDemander {
         /** Unique (per demander) rebalance id. */
         private final long rebalanceId;
 
+        /** The number of rebalance routines. */
+        private final long routines;
+
         /**
          * @param grp Cache group.
          * @param assignments Assignments.
@@ -1003,6 +1025,8 @@ public class GridDhtPartitionDemander {
                 remaining.put(k.id(), new T2<>(U.currentTimeMillis(), 
v.partitions()));
             });
 
+            this.routines = remaining.size();
+
             this.grp = grp;
             this.log = log;
             this.rebalanceId = rebalanceId;
@@ -1020,6 +1044,7 @@ public class GridDhtPartitionDemander {
             this.grp = null;
             this.log = null;
             this.rebalanceId = -1;
+            this.routines = 0;
         }
 
         /**
@@ -1054,7 +1079,8 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return true;
 
-                U.log(log, "Cancelled rebalancing from all nodes [topology=" + 
topologyVersion() + ']');
+                U.log(log, "Cancelled rebalancing from all nodes [grp=" + 
grp.cacheOrGroupName() +
+                    ", topVer=" + topologyVersion() + "]");
 
                 if (!ctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())
@@ -1077,8 +1103,8 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                U.log(log, ("Cancelled rebalancing [cache=" + 
grp.cacheOrGroupName() +
-                    ", fromNode=" + nodeId + ", topology=" + topologyVersion() 
+
+                U.log(log, ("Cancelled rebalancing [grp=" + 
grp.cacheOrGroupName() +
+                    ", supplier=" + nodeId + ", topVer=" + topologyVersion() +
                     ", time=" + (U.currentTimeMillis() - 
remaining.get(nodeId).get1()) + " ms]"));
 
                 cleanupRemoteContexts(nodeId);
@@ -1134,7 +1160,7 @@ public class GridDhtPartitionDemander {
             }
             catch (IgniteCheckedException ignored) {
                 if (log.isDebugEnabled())
-                    log.debug("Failed to send failover context cleanup request 
to node");
+                    log.debug("Failed to send failover context cleanup request 
to node " + nodeId);
             }
         }
 
@@ -1166,11 +1192,14 @@ public class GridDhtPartitionDemander {
                     ", part=" + p + ", left=" + parts + "]";
 
                 if (parts.isEmpty()) {
-                    U.log(log, "Completed " + ((remaining.size() == 1 ? 
"(final) " : "") +
-                            "rebalancing [fromNode=" + nodeId +
-                            ", cacheOrGroup=" + grp.cacheOrGroupName() +
-                            ", topology=" + topologyVersion() +
-                        ", time=" + (U.currentTimeMillis() - t.get1()) + " 
ms]"));
+                    int remainingRoutines = remaining.size() - 1;
+
+                    U.log(log, "Completed " + ((remainingRoutines == 0 ? 
"(final) " : "") +
+                            "rebalancing [grp=" + grp.cacheOrGroupName() +
+                            ", supplier=" + nodeId +
+                            ", topVer=" + topologyVersion() +
+                            ", progress=" + (routines - remainingRoutines) + 
"/" + routines +
+                            ", time=" + (U.currentTimeMillis() - t.get1()) + " 
ms]"));
 
                     remaining.remove(nodeId);
                 }
@@ -1215,6 +1244,10 @@ public class GridDhtPartitionDemander {
                 if (log.isInfoEnabled())
                     log.info("Completed rebalance future: " + this);
 
+                if (log.isDebugEnabled())
+                    log.debug("Partitions have been scheduled to resend 
[reason=" +
+                        "Rebalance is done [grp=" + grp.cacheOrGroupName() + 
"]");
+
                 ctx.exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b5c0687/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index 28c8c84..e84869d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -27,12 +27,12 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 
 /**
  * Partition map.

Reply via email to