http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
deleted file mode 100644
index fd3ce98..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ /dev/null
@@ -1,1195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import static 
org.gridgain.grid.kernal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-
-/**
- * Partition topology.
- */
-@GridToStringExclude
-class GridDhtPartitionTopologyImpl<K, V> implements 
GridDhtPartitionTopology<K, V> {
-    /** If true, then check consistency. */
-    private static final boolean CONSISTENCY_CHECK = false;
-
-    /** Flag to control amount of output for full map. */
-    private static final boolean FULL_MAP_DEBUG = false;
-
-    /** Context. */
-    private final GridCacheContext<K, V> cctx;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** */
-    private final ConcurrentMap<Integer, GridDhtLocalPartition<K, V>> locParts 
=
-        new ConcurrentHashMap8<>();
-
-    /** Node to partition map. */
-    private GridDhtPartitionFullMap node2part;
-
-    /** Partition to node map. */
-    private Map<Integer, Set<UUID>> part2node = new HashMap<>();
-
-    /** */
-    private GridDhtPartitionExchangeId lastExchangeId;
-
-    /** */
-    private long topVer = -1;
-
-    /** A future that will be completed when topology with version topVer will 
be ready to use. */
-    private GridDhtTopologyFuture topReadyFut;
-
-    /** */
-    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
-
-    /** Lock. */
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-    /**
-     * @param cctx Context.
-     */
-    GridDhtPartitionTopologyImpl(GridCacheContext<K, V> cctx) {
-        this.cctx = cctx;
-
-        log = cctx.logger(getClass());
-    }
-
-    /**
-     * @return Full map string representation.
-     */
-    @SuppressWarnings( {"ConstantConditions"})
-    private String fullMapString() {
-        return node2part == null ? "null" : FULL_MAP_DEBUG ? 
node2part.toFullString() : node2part.toString();
-    }
-
-    /**
-     * @param map Map to get string for.
-     * @return Full map string representation.
-     */
-    @SuppressWarnings( {"ConstantConditions"})
-    private String mapString(GridDhtPartitionMap map) {
-        return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : 
map.toString();
-    }
-
-    /**
-     * Waits for renting partitions.
-     *
-     * @return {@code True} if mapping was changed.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean waitForRent() throws IgniteCheckedException {
-        boolean changed = false;
-
-        // Synchronously wait for all renting partitions to complete.
-        for (Iterator<GridDhtLocalPartition<K, V>> it = 
locParts.values().iterator(); it.hasNext();) {
-            GridDhtLocalPartition<K, V> p = it.next();
-
-            GridDhtPartitionState state = p.state();
-
-            if (state == RENTING || state == EVICTED) {
-                if (log.isDebugEnabled())
-                    log.debug("Waiting for renting partition: " + p);
-
-                // Wait for partition to empty out.
-                p.rent(true).get();
-
-                if (log.isDebugEnabled())
-                    log.debug("Finished waiting for renting partition: " + p);
-
-                // Remove evicted partition.
-                it.remove();
-
-                changed = true;
-            }
-        }
-
-        return changed;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"})
-    @Override public void readLock() {
-        lock.readLock().lock();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readUnlock() {
-        lock.readLock().unlock();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updateTopologyVersion(GridDhtPartitionExchangeId 
exchId,
-        GridDhtPartitionsExchangeFuture<K, V> exchFut) {
-        lock.writeLock().lock();
-
-        try {
-            assert exchId.topologyVersion() > topVer : "Invalid topology 
version [topVer=" + topVer +
-                ", exchId=" + exchId + ']';
-
-            topVer = exchId.topologyVersion();
-
-            topReadyFut = exchFut;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long topologyVersion() {
-        lock.readLock().lock();
-
-        try {
-            assert topVer > 0;
-
-            return topVer;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtTopologyFuture topologyVersionFuture() {
-        lock.readLock().lock();
-
-        try {
-            assert topReadyFut != null;
-
-            return topReadyFut;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) 
throws IgniteCheckedException {
-        waitForRent();
-
-        ClusterNode loc = cctx.localNode();
-
-        int num = cctx.affinity().partitions();
-
-        lock.writeLock().lock();
-
-        try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology 
version [topVer=" +
-                topVer + ", exchId=" + exchId + ']';
-
-            if (!exchId.isJoined())
-                removeNode(exchId.nodeId());
-
-            // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx, topVer);
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map beforeExchange [exchId=" + exchId + 
", fullMap=" + fullMapString() + ']');
-
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            // If this is the oldest node.
-            if (oldest.id().equals(loc.id())) {
-                if (node2part == null) {
-                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Created brand new full topology map on 
oldest node [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-                else if (!node2part.valid()) {
-                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq, node2part, false);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Created new full topology map on oldest 
node [exchId=" + exchId + ", fullMap=" +
-                            node2part + ']');
-                }
-                else if (!node2part.nodeId().equals(loc.id())) {
-                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq, node2part, false);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Copied old map into new map on oldest node 
(previous oldest node left) [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-            }
-
-            if (cctx.preloadEnabled()) {
-                for (int p = 0; p < num; p++) {
-                    // If this is the first node in grid.
-                    if (oldest.id().equals(loc.id()) && 
oldest.id().equals(exchId.nodeId())) {
-                        assert exchId.isJoined();
-
-                        try {
-                            GridDhtLocalPartition<K, V> locPart = 
localPartition(p, topVer, true, false);
-
-                            assert locPart != null;
-
-                            boolean owned = locPart.own();
-
-                            assert owned : "Failed to own partition for oldest 
node [cacheName" + cctx.name() +
-                                ", part=" + locPart + ']';
-
-                            if (log.isDebugEnabled())
-                                log.debug("Owned partition for oldest node: " 
+ locPart);
-
-                            updateLocal(p, loc.id(), locPart.state(), 
updateSeq);
-                        }
-                        catch (GridDhtInvalidPartitionException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Ignoring invalid partition on 
oldest node (no need to create a partition " +
-                                    "if it no longer belongs to local node: " 
+ e.partition());
-                        }
-                    }
-                    // If this is not the first node in grid.
-                    else {
-                        if (node2part != null && node2part.valid()) {
-                            if (cctx.affinity().localNode(p, topVer)) {
-                                try {
-                                    // This will make sure that all 
non-existing partitions
-                                    // will be created in MOVING state.
-                                    GridDhtLocalPartition<K, V> locPart = 
localPartition(p, topVer, true, false);
-
-                                    updateLocal(p, loc.id(), locPart.state(), 
updateSeq);
-                                }
-                                catch (GridDhtInvalidPartitionException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Ignoring invalid partition 
(no need to create a partition if it " +
-                                            "no longer belongs to local node: 
" + e.partition());
-                                }
-                            }
-                        }
-                        // If this node's map is empty, we pre-create local 
partitions,
-                        // so local map will be sent correctly during exchange.
-                        else if (cctx.affinity().localNode(p, topVer)) {
-                            try {
-                                localPartition(p, topVer, true, false);
-                            }
-                            catch (GridDhtInvalidPartitionException e) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Ignoring invalid partition (no 
need to pre-create a partition if it " +
-                                        "no longer belongs to local node: " + 
e.partition());
-                            }
-                        }
-                    }
-                }
-            }
-            else {
-                // If preloader is disabled, then we simply clear out
-                // the partitions this node is not responsible for.
-                for (int p = 0; p < num; p++) {
-                    GridDhtLocalPartition<K, V> locPart = localPartition(p, 
topVer, false, false);
-
-                    boolean belongs = cctx.affinity().localNode(p, topVer);
-
-                    if (locPart != null) {
-                        if (!belongs) {
-                            GridDhtPartitionState state = locPart.state();
-
-                            if (state.active()) {
-                                locPart.rent(false);
-
-                                updateLocal(p, loc.id(), locPart.state(), 
updateSeq);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Evicting partition with 
preloading disabled " +
-                                        "(it does not belong to affinity): " + 
locPart);
-                            }
-                        }
-                    }
-                    else if (belongs) {
-                        try {
-                            // Pre-create partitions.
-                            localPartition(p, topVer, true, false);
-                        }
-                        catch (GridDhtInvalidPartitionException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Ignoring invalid partition with 
disabled preloader (no need to " +
-                                    "pre-create a partition if it no longer 
belongs to local node: " + e.partition());
-                        }
-                    }
-                }
-            }
-
-            if (node2part != null && node2part.valid())
-                checkEvictions(updateSeq);
-
-            consistencyCheck();
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map after beforeExchange [exchId=" + 
exchId + ", fullMap=" +
-                    fullMapString() + ']');
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-
-        // Wait for evictions.
-        waitForRent();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) 
throws IgniteCheckedException {
-        boolean changed = waitForRent();
-
-        ClusterNode loc = cctx.localNode();
-
-        int num = cctx.affinity().partitions();
-
-        long topVer = exchId.topologyVersion();
-
-        lock.writeLock().lock();
-
-        try {
-            assert topVer == exchId.topologyVersion() : "Invalid topology 
version [topVer=" +
-                topVer + ", exchId=" + exchId + ']';
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map before afterExchange [exchId=" + 
exchId + ", fullMap=" +
-                    fullMapString() + ']');
-
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            for (int p = 0; p < num; p++) {
-                GridDhtLocalPartition<K, V> locPart = localPartition(p, 
topVer, false, false);
-
-                if (cctx.affinity().localNode(p, topVer)) {
-                    // This partition will be created during next topology 
event,
-                    // which obviously has not happened at this point.
-                    if (locPart == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping local partition afterExchange 
(will not create): " + p);
-
-                        continue;
-                    }
-
-                    GridDhtPartitionState state = locPart.state();
-
-                    if (state == MOVING) {
-                        if (cctx.preloadEnabled()) {
-                            Collection<ClusterNode> owners = owners(p);
-
-                            // If there are no other owners, then become an 
owner.
-                            if (F.isEmpty(owners)) {
-                                boolean owned = locPart.own();
-
-                                assert owned : "Failed to own partition 
[cacheName" + cctx.name() + ", locPart=" +
-                                    locPart + ']';
-
-                                updateLocal(p, loc.id(), locPart.state(), 
updateSeq);
-
-                                changed = true;
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Owned partition: " + locPart);
-                            }
-                            else if (log.isDebugEnabled())
-                                log.debug("Will not own partition (there are 
owners to preload from) [locPart=" +
-                                    locPart + ", owners = " + owners + ']');
-                        }
-                        else
-                            updateLocal(p, loc.id(), locPart.state(), 
updateSeq);
-                    }
-                }
-                else {
-                    if (locPart != null) {
-                        GridDhtPartitionState state = locPart.state();
-
-                        if (state == MOVING) {
-                            locPart.rent(false);
-
-                            updateLocal(p, loc.id(), locPart.state(), 
updateSeq);
-
-                            changed = true;
-
-                            if (log.isDebugEnabled())
-                                log.debug("Evicting moving partition (it does 
not belong to affinity): " + locPart);
-                        }
-                    }
-                }
-            }
-
-            consistencyCheck();
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-
-        return changed;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int 
p, long topVer, boolean create)
-        throws GridDhtInvalidPartitionException {
-        return localPartition(p, topVer, create, true);
-    }
-
-    /**
-     * @param p Partition number.
-     * @param topVer Topology version.
-     * @param create Create flag.
-     * @param updateSeq Update sequence.
-     * @return Local partition.
-     */
-    private GridDhtLocalPartition<K, V> localPartition(int p, long topVer, 
boolean create, boolean updateSeq) {
-        while (true) {
-            boolean belongs = cctx.affinity().localNode(p, topVer);
-
-            GridDhtLocalPartition<K, V> loc = locParts.get(p);
-
-            if (loc != null && loc.state() == EVICTED) {
-                locParts.remove(p, loc);
-
-                if (!create)
-                    return null;
-
-                if (!belongs)
-                    throw new GridDhtInvalidPartitionException(p, "Adding 
entry to evicted partition [part=" + p +
-                        ", topVer=" + topVer + ", this.topVer=" + this.topVer 
+ ']');
-
-                continue;
-            }
-
-            if (loc == null && create) {
-                if (!belongs)
-                    throw new GridDhtInvalidPartitionException(p, "Creating 
partition which does not belong [part=" +
-                        p + ", topVer=" + topVer + ", this.topVer=" + 
this.topVer + ']');
-
-                lock.writeLock().lock();
-
-                try {
-                    GridDhtLocalPartition<K, V> old = locParts.putIfAbsent(p,
-                        loc = new GridDhtLocalPartition<>(cctx, p));
-
-                    if (old != null)
-                        loc = old;
-                    else {
-                        if (updateSeq)
-                            this.updateSeq.incrementAndGet();
-
-                        if (log.isDebugEnabled())
-                            log.debug("Created local partition: " + loc);
-                    }
-                }
-                finally {
-                    lock.writeLock().unlock();
-                }
-            }
-
-            return loc;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean 
create) {
-        return localPartition(cctx.affinity().partition(key), -1, create);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<GridDhtLocalPartition<K, V>> localPartitions() {
-        return new LinkedList<>(locParts.values());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridDhtLocalPartition<K, V>> 
currentLocalPartitions() {
-        return locParts.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, 
GridDhtCacheEntry<K, V> e) {
-        /*
-         * Make sure not to acquire any locks here as this method
-         * may be called from sensitive synchronization blocks.
-         * ===================================================
-         */
-
-        int p = cctx.affinity().partition(e.key());
-
-        GridDhtLocalPartition<K, V> loc = localPartition(p, topVer, true);
-
-        assert loc != null;
-
-        loc.onAdded(e);
-
-        return loc;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onRemoved(GridDhtCacheEntry<K, V> e) {
-        /*
-         * Make sure not to acquire any locks here as this method
-         * may be called from sensitive synchronization blocks.
-         * ===================================================
-         */
-
-        GridDhtLocalPartition<K, V> loc = 
localPartition(cctx.affinity().partition(e.key()), topologyVersion(), false);
-
-        if (loc != null)
-            loc.onRemoved(e);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtPartitionMap localPartitionMap() {
-        lock.readLock().lock();
-
-        try {
-            return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(),
-                F.viewReadOnly(locParts, CU.<K, V>part2state()), true);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> nodes(int p, long topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
-        lock.readLock().lock();
-
-        try {
-            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
-                ", node2part=" + node2part + ']';
-
-            Collection<ClusterNode> nodes = null;
-
-            Collection<UUID> nodeIds = part2node.get(p);
-
-            if (!F.isEmpty(nodeIds)) {
-                Collection<UUID> affIds = new 
HashSet<>(F.viewReadOnly(affNodes, F.node2id()));
-
-                for (UUID nodeId : nodeIds) {
-                    if (!affIds.contains(nodeId) && hasState(p, nodeId, 
OWNING, MOVING, RENTING)) {
-                        ClusterNode n = cctx.discovery().node(nodeId);
-
-                        if (n != null && (topVer < 0 || n.order() <= topVer)) {
-                            if (nodes == null) {
-                                nodes = new ArrayList<>(affNodes.size() + 2);
-
-                                nodes.addAll(affNodes);
-                            }
-
-                            nodes.add(n);
-                        }
-                    }
-                }
-            }
-
-            return nodes != null ? nodes : affNodes;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version ({@code -1} for all nodes).
-     * @param state Partition state.
-     * @param states Additional partition states.
-     * @return List of nodes for the partition.
-     */
-    private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState 
state, GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, 
topVer)) : null;
-
-        lock.readLock().lock();
-
-        try {
-            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
-                ", allIds=" + allIds + ", node2part=" + node2part + ']';
-
-            Collection<UUID> nodeIds = part2node.get(p);
-
-            // Node IDs can be null if both, primary and backup, nodes 
disappear.
-            int size = nodeIds == null ? 0 : nodeIds.size();
-
-            if (size == 0)
-                return Collections.emptyList();
-
-            List<ClusterNode> nodes = new ArrayList<>(size);
-
-            for (UUID id : nodeIds) {
-                if (topVer > 0 && !allIds.contains(id))
-                    continue;
-
-                if (hasState(p, id, state, states)) {
-                    ClusterNode n = cctx.discovery().node(id);
-
-                    if (n != null && (topVer < 0 || n.order() <= topVer))
-                        nodes.add(n);
-                }
-            }
-
-            return nodes;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> owners(int p, long topVer) {
-        if (!cctx.preloadEnabled())
-            return ownersAndMoving(p, topVer);
-
-        return nodes(p, topVer, OWNING);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> owners(int p) {
-        return owners(p, -1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<ClusterNode> moving(int p) {
-        if (!cctx.preloadEnabled())
-            return ownersAndMoving(p, -1);
-
-        return nodes(p, -1, MOVING);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return List of nodes in state OWNING or MOVING.
-     */
-    private List<ClusterNode> ownersAndMoving(int p, long topVer) {
-        return nodes(p, topVer, OWNING, MOVING);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long updateSequence() {
-        return updateSeq.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) {
-        lock.readLock().lock();
-
-        try {
-            assert node2part != null && node2part.valid() : "Invalid node2part 
[node2part: " + node2part +
-                ", locNodeId=" + cctx.localNode().id() + ", locName=" + 
cctx.gridName() + ']';
-
-            GridDhtPartitionFullMap m = node2part;
-
-            return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), 
m.updateSequence(), m, onlyActive);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap update(@Nullable 
GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionFullMap partMap) {
-        if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchId=" + exchId + ", 
parts=" + fullMapString() + ']');
-
-        lock.writeLock().lock();
-
-        try {
-            if (exchId != null && lastExchangeId != null && 
lastExchangeId.compareTo(exchId) >= 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for full partition map update 
(will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
-
-                return null;
-            }
-
-            if (node2part != null && node2part.compareTo(partMap) >= 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale partition map for full partition map 
update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + 
node2part + ", newMap=" + partMap + ']');
-
-                return null;
-            }
-
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            if (exchId != null)
-                lastExchangeId = exchId;
-
-            if (node2part != null) {
-                for (GridDhtPartitionMap part : node2part.values()) {
-                    GridDhtPartitionMap newPart = partMap.get(part.nodeId());
-
-                    // If for some nodes current partition has a newer map,
-                    // then we keep the newer value.
-                    if (newPart != null && newPart.updateSequence() < 
part.updateSequence()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update 
map [exchId=" + exchId + ", curPart=" +
-                                mapString(part) + ", newPart=" + 
mapString(newPart) + ']');
-
-                        partMap.put(part.nodeId(), part);
-                    }
-                }
-
-                for (Iterator<UUID> it = partMap.keySet().iterator(); 
it.hasNext();) {
-                    UUID nodeId = it.next();
-
-                    if (!cctx.discovery().alive(nodeId)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Removing left node from full map update 
[nodeId=" + nodeId + ", partMap=" +
-                                partMap + ']');
-
-                        it.remove();
-                    }
-                }
-            }
-
-            node2part = partMap;
-
-            Map<Integer, Set<UUID>> p2n = new 
HashMap<>(cctx.affinity().partitions(), 1.0f);
-
-            for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
-                for (Integer p : e.getValue().keySet()) {
-                    Set<UUID> ids = p2n.get(p);
-
-                    if (ids == null)
-                        // Initialize HashSet to size 3 in anticipation that 
there won't be
-                        // more than 3 nodes per partitions.
-                        p2n.put(p, ids = U.newHashSet(3));
-
-                    ids.add(e.getKey());
-                }
-            }
-
-            part2node = p2n;
-
-            boolean changed = checkEvictions(updateSeq);
-
-            consistencyCheck();
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map after full update: " + 
fullMapString());
-
-            return changed ? localPartitionMap() : null;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap update(@Nullable 
GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts) {
-        if (log.isDebugEnabled())
-            log.debug("Updating single partition map [exchId=" + exchId + ", 
parts=" + mapString(parts) + ']');
-
-        if (!cctx.discovery().alive(parts.nodeId())) {
-            if (log.isDebugEnabled())
-                log.debug("Received partition update for non-existing node 
(will ignore) [exchId=" + exchId +
-                    ", parts=" + parts + ']');
-
-            return null;
-        }
-
-        lock.writeLock().lock();
-
-        try {
-            if (lastExchangeId != null && exchId != null && 
lastExchangeId.compareTo(exchId) > 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map 
update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
-
-                return null;
-            }
-
-            if (exchId != null)
-                lastExchangeId = exchId;
-
-            if (node2part == null)
-                // Create invalid partition map.
-                node2part = new GridDhtPartitionFullMap();
-
-            GridDhtPartitionMap cur = node2part.get(parts.nodeId());
-
-            if (cur != null && cur.updateSequence() >= parts.updateSequence()) 
{
-                if (log.isDebugEnabled())
-                    log.debug("Stale update sequence for single partition map 
update (will ignore) [exchId=" + exchId +
-                        ", curSeq=" + cur.updateSequence() + ", newSeq=" + 
parts.updateSequence() + ']');
-
-                return null;
-            }
-
-            long updateSeq = this.updateSeq.incrementAndGet();
-
-            node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
-
-            boolean changed = false;
-
-            if (cur == null || !cur.equals(parts))
-                changed = true;
-
-            node2part.put(parts.nodeId(), parts);
-
-            part2node = new HashMap<>(part2node);
-
-            // Add new mappings.
-            for (Integer p : parts.keySet()) {
-                Set<UUID> ids = part2node.get(p);
-
-                if (ids == null)
-                    // Initialize HashSet to size 3 in anticipation that there 
won't be
-                    // more than 3 nodes per partition.
-                    part2node.put(p, ids = U.newHashSet(3));
-
-                changed |= ids.add(parts.nodeId());
-            }
-
-            // Remove obsolete mappings.
-            if (cur != null) {
-                for (Integer p : F.view(cur.keySet(), 
F0.notIn(parts.keySet()))) {
-                    Set<UUID> ids = part2node.get(p);
-
-                    if (ids != null)
-                        changed |= ids.remove(parts.nodeId());
-                }
-            }
-
-            changed |= checkEvictions(updateSeq);
-
-            consistencyCheck();
-
-            if (log.isDebugEnabled())
-                log.debug("Partition map after single update: " + 
fullMapString());
-
-            return changed ? localPartitionMap() : null;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /**
-     * @param updateSeq Update sequence.
-     * @return Checks if any of the local partitions need to be evicted.
-     */
-    private boolean checkEvictions(long updateSeq) {
-        assert lock.isWriteLockedByCurrentThread();
-
-        boolean changed = false;
-
-        UUID locId = cctx.nodeId();
-
-        for (GridDhtLocalPartition<K, V> part : locParts.values()) {
-            GridDhtPartitionState state = part.state();
-
-            if (state.active()) {
-                int p = part.id();
-
-                List<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
-                if (!affNodes.contains(cctx.localNode())) {
-                    Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, 
OWNING));
-
-                    // If all affinity nodes are owners, then evict partition 
from local node.
-                    if (nodeIds.containsAll(F.nodeIds(affNodes))) {
-                        part.rent(false);
-
-                        updateLocal(part.id(), locId, part.state(), updateSeq);
-
-                        changed = true;
-
-                        if (log.isDebugEnabled())
-                            log.debug("Evicted local partition (all affinity 
nodes are owners): " + part);
-                    }
-                    else {
-                        int ownerCnt = nodeIds.size();
-                        int affCnt = affNodes.size();
-
-                        if (ownerCnt > affCnt) {
-                            List<ClusterNode> sorted = new 
ArrayList<>(cctx.discovery().nodes(nodeIds));
-
-                            // Sort by node orders in ascending order.
-                            Collections.sort(sorted, CU.nodeComparator(true));
-
-                            int diff = sorted.size() - affCnt;
-
-                            for (int i = 0; i < diff; i++) {
-                                ClusterNode n = sorted.get(i);
-
-                                if (locId.equals(n.id())) {
-                                    part.rent(false);
-
-                                    updateLocal(part.id(), locId, 
part.state(), updateSeq);
-
-                                    changed = true;
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Evicted local partition 
(this node is oldest non-affinity node): " +
-                                            part);
-
-                                    break;
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        return changed;
-    }
-
-    /**
-     * Updates value for single partition.
-     *
-     * @param p Partition.
-     * @param nodeId Node ID.
-     * @param state State.
-     * @param updateSeq Update sequence.
-     */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, 
long updateSeq) {
-        assert lock.isWriteLockedByCurrentThread();
-        assert nodeId.equals(cctx.nodeId());
-
-        // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
-
-        // If this node became the oldest node.
-        if (oldest.id().equals(cctx.nodeId())) {
-            long seq = node2part.updateSequence();
-
-            if (seq != updateSeq) {
-                if (seq > updateSeq) {
-                    if (this.updateSeq.get() < seq) {
-                        // Update global counter if necessary.
-                        boolean b = 
this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1);
-
-                        assert b : "Invalid update sequence [updateSeq=" + 
updateSeq + ", seq=" + seq +
-                            ", curUpdateSeq=" + this.updateSeq.get() + ", 
node2part=" + node2part.toFullString() + ']';
-
-                        updateSeq = seq + 1;
-                    }
-                    else
-                        updateSeq = seq;
-                }
-
-                node2part.updateSequence(updateSeq);
-            }
-        }
-
-        GridDhtPartitionMap map = node2part.get(nodeId);
-
-        if (map == null)
-            node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, 
updateSeq,
-                Collections.<Integer, GridDhtPartitionState>emptyMap(), 
false));
-
-        map.updateSequence(updateSeq);
-
-        map.put(p, state);
-
-        Set<UUID> ids = part2node.get(p);
-
-        if (ids == null)
-            part2node.put(p, ids = U.newHashSet(3));
-
-        ids.add(nodeId);
-    }
-
-    /**
-     * @param nodeId Node to remove.
-     */
-    private void removeNode(UUID nodeId) {
-        assert nodeId != null;
-        assert lock.writeLock().isHeldByCurrentThread();
-
-        ClusterNode oldest = CU.oldest(cctx, topVer);
-
-        ClusterNode loc = cctx.localNode();
-
-        if (node2part != null) {
-            if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) {
-                updateSeq.setIfGreater(node2part.updateSequence());
-
-                node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), 
updateSeq.incrementAndGet(),
-                    node2part, false);
-            }
-            else
-                node2part = new GridDhtPartitionFullMap(node2part, 
node2part.updateSequence());
-
-            part2node = new HashMap<>(part2node);
-
-            GridDhtPartitionMap parts = node2part.remove(nodeId);
-
-            if (parts != null) {
-                for (Integer p : parts.keySet()) {
-                    Set<UUID> nodeIds = part2node.get(p);
-
-                    if (nodeIds != null) {
-                        nodeIds.remove(nodeId);
-
-                        if (nodeIds.isEmpty())
-                            part2node.remove(p);
-                    }
-                }
-            }
-
-            consistencyCheck();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean own(GridDhtLocalPartition<K, V> part) {
-        ClusterNode loc = cctx.localNode();
-
-        lock.writeLock().lock();
-
-        try {
-            if (part.own()) {
-                updateLocal(part.id(), loc.id(), part.state(), 
updateSeq.incrementAndGet());
-
-                consistencyCheck();
-
-                return true;
-            }
-
-            consistencyCheck();
-
-            return false;
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onEvicted(GridDhtLocalPartition<K, V> part, boolean 
updateSeq) {
-        assert updateSeq || lock.isWriteLockedByCurrentThread();
-
-        lock.writeLock().lock();
-
-        try {
-            assert part.state() == EVICTED;
-
-            long seq = updateSeq ? this.updateSeq.incrementAndGet() : 
this.updateSeq.get();
-
-            updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
-
-            consistencyCheck();
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) {
-        lock.readLock().lock();
-
-        try {
-            return node2part.get(nodeId);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void printMemoryStats(int threshold) {
-        X.println(">>>  Cache partition topology stats [grid=" + 
cctx.gridName() + ", cache=" + cctx.name() + ']');
-
-        for (GridDhtLocalPartition part : locParts.values()) {
-            int size = part.size();
-
-            if (size >= threshold)
-                X.println(">>>   Local partition [part=" + part.id() + ", 
size=" + size + ']');
-        }
-    }
-
-    /**
-     * @param p Partition.
-     * @param nodeId Node ID.
-     * @param match State to match.
-     * @param matches Additional states.
-     * @return Filter for owners of this partition.
-     */
-    private boolean hasState(final int p, @Nullable UUID nodeId, final 
GridDhtPartitionState match,
-        final GridDhtPartitionState... matches) {
-        if (nodeId == null)
-            return false;
-
-        GridDhtPartitionMap parts = node2part.get(nodeId);
-
-        // Set can be null if node has been removed.
-        if (parts != null) {
-            GridDhtPartitionState state = parts.get(p);
-
-            if (state == match)
-                return true;
-
-            if (matches != null && matches.length > 0)
-                for (GridDhtPartitionState s : matches)
-                    if (state == s)
-                        return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Checks consistency after all operations.
-     */
-    private void consistencyCheck() {
-        if (CONSISTENCY_CHECK) {
-            assert lock.writeLock().isHeldByCurrentThread();
-
-            if (node2part == null)
-                return;
-
-            for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
-                for (Integer p : e.getValue().keySet()) {
-                    Set<UUID> nodeIds = part2node.get(p);
-
-                    assert nodeIds != null : "Failed consistency check [part=" 
+ p + ", nodeId=" + e.getKey() + ']';
-                    assert nodeIds.contains(e.getKey()) : "Failed consistency 
check [part=" + p + ", nodeId=" +
-                        e.getKey() + ", nodeIds=" + nodeIds + ']';
-                }
-            }
-
-            for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
-                for (UUID nodeId : e.getValue()) {
-                    GridDhtPartitionMap map = node2part.get(nodeId);
-
-                    assert map != null : "Failed consistency check [part=" + 
e.getKey() + ", nodeId=" + nodeId + ']';
-                    assert map.containsKey(e.getKey()) : "Failed consistency 
check [part=" + e.getKey() +
-                        ", nodeId=" + nodeId + ']';
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
deleted file mode 100644
index 21a39d7..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.managers.discovery.*;
-
-/**
- * Future that implements a barrier after which dht topology is safe to use. 
Topology is considered to be
- * safe to use when all transactions that involve moving primary partitions 
are completed and partition map
- * exchange is also completed.
- * <p/>
- * When new new transaction is started, it will wait for this future before 
acquiring new locks on particular
- * topology version.
- */
-public interface GridDhtTopologyFuture extends IgniteFuture<Long> {
-    /**
-     * Gets a topology snapshot for the topology version represented by the 
future. Note that by the time
-     * partition exchange completes some nodes from the snapshot may leave the 
grid. One should use discovery
-     * service to check if the node is valid.
-     * <p/>
-     * This method will block until the topology future is ready.
-     *
-     * @return Topology snapshot for particular topology version.
-     * @throws IgniteCheckedException If topology future failed.
-     */
-    public GridDiscoveryTopologySnapshot topologySnapshot() throws 
IgniteCheckedException;
-}

Reply via email to