http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 10caf07..2ac1ba6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
-import java.util.AbstractSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
@@ -34,13 +32,15 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.CachePeekModes;
 import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable;
@@ -59,7 +59,9 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdat
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -70,7 +72,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CI3;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
@@ -86,8 +88,11 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache adapter.
@@ -96,18 +101,36 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Topology. */
-    private GridDhtPartitionTopologyImpl top;
-
-    /** Preloader. */
-    protected GridCachePreloader preldr;
-
     /** Multi tx future holder. */
     private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> 
multiTxHolder = new ThreadLocal<>();
 
     /** Multi tx futures. */
     private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new 
ConcurrentHashMap8<>();
 
+    /** Force key futures. */
+    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> 
forceKeyFuts = newMap();
+
+    /** */
+    private volatile boolean stopping;
+
+    /** Discovery listener. */
+    private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
+        @Override public void onEvent(Event evt) {
+            DiscoveryEvent e = (DiscoveryEvent)evt;
+
+            ClusterNode loc = ctx.localNode();
+
+            assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : 
e;
+
+            final ClusterNode n = e.eventNode();
+
+            assert !loc.id().equals(n.id());
+
+            for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
+                f.onDiscoveryEvent(e);
+        }
+    };
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -116,6 +139,176 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     }
 
     /**
+     * Adds future to future map.
+     *
+     * @param fut Future to add.
+     * @return {@code False} if node cache is stopping and future was 
completed with error.
+     */
+    public boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
+        forceKeyFuts.put(fut.futureId(), fut);
+
+        if (stopping) {
+            fut.onDone(stopError());
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Removes future from future map.
+     *
+     * @param fut Future to remove.
+     */
+    public void removeFuture(GridDhtForceKeysFuture<?, ?> fut) {
+        forceKeyFuts.remove(fut.futureId(), fut);
+    }
+
+    /**
+     * @param node Node.
+     * @param msg Message.
+     */
+    protected final void processForceKeyResponse(ClusterNode node, 
GridDhtForceKeysResponse msg) {
+        GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
+
+        if (f != null)
+            f.onResult(msg);
+        else if (log.isDebugEnabled())
+            log.debug("Receive force key response for unknown future (is it 
duplicate?) [nodeId=" + node.id() +
+                ", res=" + msg + ']');
+    }
+    /**
+     * @param node Node originated request.
+     * @param msg Force keys message.
+     */
+    protected final void processForceKeysRequest(final ClusterNode node, final 
GridDhtForceKeysRequest msg) {
+        IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), 
msg.cacheId(), msg.topologyVersion());
+
+        if (fut.isDone())
+            processForceKeysRequest0(node, msg);
+        else
+            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> t) {
+                    processForceKeysRequest0(node, msg);
+                }
+            });
+    }
+
+    /**
+     * @param node Node originated request.
+     * @param msg Force keys message.
+     */
+    private void processForceKeysRequest0(ClusterNode node, 
GridDhtForceKeysRequest msg) {
+        try {
+            ClusterNode loc = ctx.localNode();
+
+            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
+                ctx.cacheId(),
+                msg.futureId(),
+                msg.miniId(),
+                ctx.deploymentEnabled());
+
+            GridDhtPartitionTopology top = ctx.topology();
+
+            for (KeyCacheObject k : msg.keys()) {
+                int p = ctx.affinity().partition(k);
+
+                GridDhtLocalPartition locPart = top.localPartition(p, 
AffinityTopologyVersion.NONE, false);
+
+                // If this node is no longer an owner.
+                if (locPart == null && !top.owners(p).contains(loc)) {
+                    res.addMissed(k);
+
+                    continue;
+                }
+
+                GridCacheEntryEx entry;
+
+                while (true) {
+                    try {
+                        entry = ctx.dht().entryEx(k);
+
+                        entry.unswap();
+
+                        GridCacheEntryInfo info = entry.info();
+
+                        if (info == null) {
+                            assert entry.obsolete() : entry;
+
+                            continue;
+                        }
+
+                        if (!info.isNew())
+                            res.addInfo(info);
+
+                        ctx.evicts().touch(entry, msg.topologyVersion());
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry: " + k);
+                    }
+                    catch (GridDhtInvalidPartitionException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Local node is no longer an owner: " + 
p);
+
+                        res.addMissed(k);
+
+                        break;
+                    }
+                }
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Sending force key response [node=" + node.id() + ", 
res=" + res + ']');
+
+            ctx.io().send(node, res, ctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Received force key request form failed node (will 
ignore) [nodeId=" + node.id() +
+                    ", req=" + msg + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to reply to force key request [nodeId=" + 
node.id() + ", req=" + msg + ']', e);
+        }
+    }
+
+    /**
+     *
+     */
+    public void dumpDebugInfo() {
+        if (!forceKeyFuts.isEmpty()) {
+            U.warn(log, "Pending force key futures [cache=" + ctx.name() + 
"]:");
+
+            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+                U.warn(log, ">>> " + fut);
+        }
+    }
+
+    @Override public void onKernalStop() {
+        super.onKernalStop();
+
+        stopping = true;
+
+        IgniteCheckedException err = stopError();
+
+        for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+            fut.onDone(err);
+
+        ctx.gridEvents().removeLocalEventListener(discoLsnr);
+    }
+
+    /**
+     * @return Node stop exception.
+     */
+    private IgniteCheckedException stopError() {
+        return new NodeStoppingException("Operation has been cancelled (cache 
or node is stopping).");
+    }
+
+    /**
      * @param nodeId Sender node ID.
      * @param res Near get response.
      */
@@ -160,7 +353,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @param ctx Context.
      */
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
-        this(ctx, new GridCachePartitionedConcurrentMap(ctx));
+        this(ctx, new GridCachePartitionedConcurrentMap(ctx.group()));
     }
 
     /**
@@ -174,83 +367,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override protected void init() {
-        super.init();
-
-        top = new GridDhtPartitionTopologyImpl(ctx, entryFactory());
-    }
-
-    /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, 
new CI2<UUID, GridCacheTtlUpdateRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), 
GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() {
             @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest 
req) {
                 processTtlUpdateRequest(req);
             }
         });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() {
-        super.stop();
-
-        if (preldr != null)
-            preldr.stop();
-
-        // Clean up to help GC.
-        preldr = null;
-        top = null;
-    }
 
-    /** {@inheritDoc} */
-    @Override public void onReconnected() {
-        super.onReconnected();
-
-        ctx.affinity().onReconnected();
-
-        top.onReconnected();
-
-        if (preldr != null)
-            preldr.onReconnected();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        if (preldr != null)
-            preldr.onKernalStart();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop() {
-        super.onKernalStop();
-
-        if (preldr != null)
-            preldr.onKernalStop();
+        ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, 
EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         super.printMemoryStats();
 
-        top.printMemoryStats(1024);
-    }
-
-    /**
-     * @return Cache map entry factory.
-     */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
-        return new GridCacheMapEntryFactory() {
-            @Override public GridCacheMapEntry create(
-                GridCacheContext ctx,
-                AffinityTopologyVersion topVer,
-                KeyCacheObject key
-            ) {
-                return new GridDhtCacheEntry(ctx, topVer, key);
-            }
-        };
+        ctx.group().topology().printMemoryStats(1024);
     }
 
     /**
@@ -262,21 +393,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @return Partition topology.
      */
     public GridDhtPartitionTopology topology() {
-        return top;
+        return ctx.group().topology();
     }
 
     /** {@inheritDoc} */
     @Override public GridCachePreloader preloader() {
-        return preldr;
-    }
-
-    /**
-     * @return DHT preloader.
-     */
-    public GridDhtPreloader dhtPreloader() {
-        assert preldr instanceof GridDhtPreloader;
-
-        return (GridDhtPreloader)preldr;
+        return ctx.group().preloader();
     }
 
     /**
@@ -300,6 +422,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (tup != null)
             throw new IgniteCheckedException("Nested multi-update locks are 
not supported");
 
+        GridDhtPartitionTopology top = ctx.group().topology();
+
         top.readLock();
 
         GridDhtTopologyFuture topFut;
@@ -342,7 +466,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (tup == null)
             throw new IgniteCheckedException("Multi-update was not started or 
released twice.");
 
-        top.readLock();
+        ctx.group().topology().readLock();
 
         try {
             IgniteUuid lockId = tup.get1();
@@ -355,7 +479,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
             multiFut.onDone(lockId);
         }
         finally {
-            top.readUnlock();
+            ctx.group().topology().readUnlock();
         }
     }
 
@@ -516,7 +640,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
             return;
 
         try {
-            GridDhtLocalPartition part = 
top.localPartition(ctx.affinity().partition(key),
+            GridDhtLocalPartition part = 
ctx.group().topology().localPartition(ctx.affinity().partition(key),
                 AffinityTopologyVersion.NONE, true);
 
             // Reserve to make sure that partition does not get unloaded.
@@ -576,7 +700,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         long sum = 0;
 
         for (GridDhtLocalPartition p : topology().currentLocalPartitions())
-            sum += p.dataStore().size();
+            sum += p.dataStore().cacheSize(ctx.cacheId());
 
         return sum;
     }
@@ -594,7 +718,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
 
         for (GridDhtLocalPartition p : topology().currentLocalPartitions()) {
             if (p.primary(topVer))
-                sum += p.dataStore().size();
+                sum += p.dataStore().cacheSize(ctx.cacheId());
         }
 
         return sum;
@@ -809,7 +933,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
 
                         res = new GridNearSingleGetResponse(ctx.cacheId(),
                             req.futureId(),
-                            req.topologyVersion(),
+                            null,
                             res0,
                             false,
                             req.addDeploymentInfo());
@@ -818,9 +942,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                             res.setContainsValue();
                     }
                     else {
-                        AffinityTopologyVersion topVer = 
ctx.shared().exchange().readyAffinityVersion();
+                        AffinityTopologyVersion topVer = 
ctx.shared().exchange().lastTopologyFuture().topologyVersion();
 
-                        assert topVer.compareTo(req.topologyVersion()) >= 0 : 
"Wrong ready topology version for " +
+                        assert topVer.compareTo(req.topologyVersion()) > 0 : 
"Wrong ready topology version for " +
                             "invalid partitions response [topVer=" + topVer + 
", req=" + req + ']';
 
                         res = new GridNearSingleGetResponse(ctx.cacheId(),
@@ -908,9 +1032,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                 }
 
                 if (!F.isEmpty(fut.invalidPartitions()))
-                    res.invalidPartitions(fut.invalidPartitions(), 
ctx.shared().exchange().readyAffinityVersion());
-                else
-                    res.invalidPartitions(fut.invalidPartitions(), 
req.topologyVersion());
+                    res.invalidPartitions(fut.invalidPartitions(), 
ctx.shared().exchange().lastTopologyFuture().topologyVersion());
 
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());
@@ -1096,7 +1218,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
             false);
 
         if (part != null)
-            part.onDeferredDelete(entry.key(), ver);
+            part.onDeferredDelete(entry.context().cacheId(), entry.key(), ver);
     }
 
     /**
@@ -1108,8 +1230,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (expVer.equals(curVer))
             return false;
 
-        Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
-        Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
+        Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
+        Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 
         if (!cacheNodes0.equals(cacheNodes1) || 
ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
             return true;
@@ -1147,7 +1269,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @param topVer Specified affinity topology version.
      * @return Local entries iterator.
      */
-    public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean 
primary,
+    private Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean 
primary,
         final boolean backup,
         final boolean keepBinary,
         final AffinityTopologyVersion topVer) {
@@ -1161,7 +1283,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @param topVer Specified affinity topology version.
      * @return Local entries iterator.
      */
-    public Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final 
boolean primary,
+    private Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final 
boolean primary,
         final boolean backup,
         final AffinityTopologyVersion topVer) {
         assert primary || backup;
@@ -1208,7 +1330,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                                 GridDhtLocalPartition part = partIt.next();
 
                                 if (primary == part.primary(topVer)) {
-                                    curIt = part.entries().iterator();
+                                    curIt = 
part.entries(ctx.cacheId()).iterator();
 
                                     break;
                                 }
@@ -1253,4 +1375,35 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
             return topVer;
         }
     }
+
+    /**
+     *
+     */
+    protected abstract class MessageHandler<M> implements 
IgniteBiInClosure<UUID, M> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void apply(UUID nodeId, M msg) {
+            ClusterNode node = ctx.node(nodeId);
+
+            if (node == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Received message from failed node [node=" + 
nodeId + ", msg=" + msg + ']');
+
+                return;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Received message from node [node=" + nodeId + ", 
msg=" + msg + ']');
+
+            onMessage(node, msg);
+        }
+
+        /**
+         * @param node Node.
+         * @param msg Message.
+         */
+        protected abstract void onMessage(ClusterNode node, M msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 8c0b0c2..ebb2cfc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -93,8 +93,10 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected long nextPartCounter() {
-        return locPart.nextUpdateCounter();
+    @Override protected long nextPartitionCounter(AffinityTopologyVersion 
topVer,
+        boolean primary,
+        @Nullable Long primaryCntr) {
+        return locPart.nextUpdateCounter(cctx.cacheId(), topVer, primary, 
primaryCntr);
     }
 
     /** {@inheritDoc} */
@@ -139,7 +141,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
         assert !Thread.holdsLock(this);
 
         // Remove this entry from partition mapping.
-        cctx.dht().topology().onRemoved(this);
+        cctx.topology().onRemoved(this);
     }
 
     /**
@@ -715,8 +717,8 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
     /**
      * @return Cache name.
      */
-    protected String cacheName() {
-        return cctx.dht().near().name();
+    protected final String cacheName() {
+        return cctx.name();
     }
 
     /** {@inheritDoc} */
@@ -726,12 +728,12 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
 
     /** {@inheritDoc} */
     @Override protected void incrementMapPublicSize() {
-        locPart.incrementPublicSize(this);
+        locPart.incrementPublicSize(null, this);
     }
 
     /** {@inheritDoc} */
     @Override protected void decrementMapPublicSize() {
-        locPart.decrementPublicSize(this);
+        locPart.decrementPublicSize(null, this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 458bc4a..49922fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -166,7 +166,7 @@ public final class GridDhtGetFuture<K, V> extends 
GridCompoundIdentityFuture<Col
      * Initializes future.
      */
     void init() {
-        GridDhtFuture<Object> fut = 
cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
+        GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, 
keys.keySet(), topVer);
 
         if (fut != null) {
             if (!F.isEmpty(fut.invalidPartitions())) {
@@ -292,9 +292,11 @@ public final class GridDhtGetFuture<K, V> extends 
GridCompoundIdentityFuture<Col
      */
     private boolean map(KeyCacheObject key) {
         try {
+            int keyPart = cctx.affinity().partition(key);
+
             GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
-                
cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) 
:
-                cache().topology().localPartition(key, false);
+                cache().topology().localPartition(keyPart, topVer, true) :
+                cache().topology().localPartition(keyPart);
 
             if (part == null)
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 9a7cfdc..1a81f6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -87,8 +87,8 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
     /** Topology version .*/
     private AffinityTopologyVersion topVer;
 
-    /** Retries because ownership changed. */
-    private Collection<Integer> retries;
+    /** Retry because ownership changed. */
+    private Integer retry;
 
     /** Subject ID. */
     private UUID subjId;
@@ -194,17 +194,21 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
      *
      */
     private void map() {
-        if (cctx.dht().dhtPreloader().needForceKeys()) {
-            GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+        if (cctx.group().preloader().needForceKeys()) {
+            GridDhtFuture<Object> fut = cctx.group().preloader().request(
+                cctx,
                 Collections.singleton(key),
                 topVer);
 
             if (fut != null) {
                 if (!F.isEmpty(fut.invalidPartitions())) {
-                    if (retries == null)
-                        retries = new HashSet<>();
+                    assert fut.invalidPartitions().size() == 1 : 
fut.invalidPartitions();
 
-                    retries.addAll(fut.invalidPartitions());
+                    retry = F.first(fut.invalidPartitions());
+
+                    onDone((GridCacheEntryInfo)null);
+
+                    return;
                 }
 
                 fut.listen(
@@ -239,17 +243,14 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
      *
      */
     private void map0() {
-        // Assign keys to primary nodes.
-        int part = cctx.affinity().partition(key);
+        assert retry == null : retry;
 
-        if (retries == null || !retries.contains(part)) {
-            if (!map(key)) {
-                retries = Collections.singleton(part);
+        if (!map(key)) {
+            retry = cctx.affinity().partition(key);
 
-                onDone((GridCacheEntryInfo)null);
+            onDone((GridCacheEntryInfo)null);
 
-                return;
-            }
+            return;
         }
 
         getAsync();
@@ -257,7 +258,7 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> invalidPartitions() {
-        return retries == null ? Collections.<Integer>emptyList() : retries;
+        return retry == null ? Collections.<Integer>emptyList() : 
Collections.singletonList(retry);
     }
 
     /**
@@ -266,9 +267,11 @@ public final class GridDhtGetSingleFuture<K, V> extends 
GridFutureAdapter<GridCa
      */
     private boolean map(KeyCacheObject key) {
         try {
+            int keyPart = cctx.affinity().partition(key);
+
             GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
-                
cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) 
:
-                cache().topology().localPartition(key, false);
+                cache().topology().localPartition(keyPart, topVer, true) :
+                cache().topology().localPartition(keyPart);
 
             if (part == null)
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index a35c168..a53e864 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -19,7 +19,10 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -30,31 +33,33 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
@@ -71,6 +76,17 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  * Key partition.
  */
 public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl 
implements Comparable<GridDhtLocalPartition>, GridReservable {
+    /** */
+    private static final GridCacheMapEntryFactory ENTRY_FACTORY = new 
GridCacheMapEntryFactory() {
+        @Override public GridCacheMapEntry create(
+            GridCacheContext ctx,
+            AffinityTopologyVersion topVer,
+            KeyCacheObject key
+        ) {
+            return new GridDhtCacheEntry(ctx, topVer, key);
+        }
+    };
+
     /** Maximum size for delete queue. */
     public static final int MAX_DELETE_QUEUE_SIZE = 
Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000);
 
@@ -101,29 +117,48 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     @GridToStringExclude
     private final GridFutureAdapter<?> rent;
 
-    /** Context. */
-    private final GridCacheContext cctx;
+    /** */
+    @GridToStringExclude
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    @GridToStringExclude
+    private final CacheGroupContext grp;
 
     /** Create time. */
     @GridToStringExclude
     private final long createTime = U.currentTimeMillis();
 
     /** Eviction history. */
+    @GridToStringExclude
     private final Map<KeyCacheObject, GridCacheVersion> evictHist = new 
HashMap<>();
 
     /** Lock. */
+    @GridToStringExclude
     private final ReentrantLock lock = new ReentrantLock();
 
+    /** */
+    @GridToStringExclude
+    private final ConcurrentMap<Integer, CacheMapHolder> cacheMaps;
+
+    /** */
+    @GridToStringExclude
+    private final CacheMapHolder singleCacheEntryMap;
+
     /** Remove queue. */
+    @GridToStringExclude
     private final ConcurrentLinkedDeque8<RemovedEntryHolder> rmvQueue = new 
ConcurrentLinkedDeque8<>();
 
     /** Group reservations. */
+    @GridToStringExclude
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> 
reservations = new CopyOnWriteArrayList<>();
 
     /** */
+    @GridToStringExclude
     private final CacheDataStore store;
 
     /** Partition updates. */
+    @GridToStringExclude
     private final ConcurrentNavigableMap<Long, Boolean> updates = new 
ConcurrentSkipListMap<>();
 
     /** Last applied update. */
@@ -137,21 +172,30 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     private boolean reload;
 
     /**
-     * @param cctx Context.
+     * @param ctx Context.
+     * @param grp Cache group.
      * @param id Partition ID.
-     * @param entryFactory Entry factory.
      */
-    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") 
GridDhtLocalPartition(
-        GridCacheContext cctx,
-        int id,
-        GridCacheMapEntryFactory entryFactory
-    ) {
-        super(cctx, entryFactory, Math.max(10, 
GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions()));
+    @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+    GridDhtLocalPartition(GridCacheSharedContext ctx,
+        CacheGroupContext grp,
+        int id) {
+        super(ENTRY_FACTORY);
 
         this.id = id;
-        this.cctx = cctx;
+        this.ctx = ctx;
+        this.grp = grp;
 
-        log = U.logger(cctx.kernalContext(), logRef, this);
+        log = U.logger(ctx.kernalContext(), logRef, this);
+
+        if (grp.sharedGroup()) {
+            singleCacheEntryMap = null;
+            cacheMaps = new ConcurrentHashMap<>();
+        }
+        else {
+            singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), 
createEntriesMap());
+            cacheMaps = null;
+        }
 
         rent = new GridFutureAdapter<Object>() {
             @Override public String toString() {
@@ -159,15 +203,15 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             }
         };
 
-        int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
-            Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
+        int delQueueSize = grp.systemCache() ? 100 :
+            Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
 
         rmvQueueMaxSize = U.ceilPow2(delQueueSize);
 
         rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
 
         try {
-            store = cctx.offheap().createCacheDataStore(id);
+            store = grp.offheap().createCacheDataStore(id);
         }
         catch (IgniteCheckedException e) {
             // TODO ignite-db
@@ -176,6 +220,62 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @return Entries map.
+     */
+    private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> 
createEntriesMap() {
+        return new ConcurrentHashMap8<>(Math.max(10, 
GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()),
+            0.75f,
+            Runtime.getRuntime().availableProcessors() * 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int internalSize() {
+        if (grp.sharedGroup()) {
+            int size = 0;
+
+            for (CacheMapHolder hld : cacheMaps.values())
+                size += hld.map.size();
+
+            return size;
+        }
+
+        return singleCacheEntryMap.map.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) {
+        if (grp.sharedGroup())
+            return cacheMapHolder(cctx);
+
+        return singleCacheEntryMap;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer 
cacheId) {
+        return grp.sharedGroup() ? cacheMaps.get(cacheId) : 
singleCacheEntryMap;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @return Map holder.
+     */
+    private CacheMapHolder cacheMapHolder(GridCacheContext cctx) {
+        assert grp.sharedGroup();
+
+        CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed());
+
+        if (hld != null)
+            return hld;
+
+        CacheMapHolder  old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = 
new CacheMapHolder(cctx, createEntriesMap()));
+
+        if (old != null)
+            hld = old;
+
+        return hld;
+    }
+
+    /**
      * @return Data store.
      */
     public CacheDataStore dataStore() {
@@ -242,10 +342,10 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * @return {@code True} if partition is empty.
      */
     public boolean isEmpty() {
-        if (cctx.allowFastEviction())
+        if (grp.allowFastEviction())
             return internalSize() == 0;
 
-        return store.size() == 0 && internalSize() == 0;
+        return store.fullSize() == 0 && internalSize() == 0;
     }
 
     /**
@@ -309,6 +409,20 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param ver Version.
+     */
+    private void removeVersionedEntry(int cacheId, KeyCacheObject key, 
GridCacheVersion ver) {
+        CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : 
singleCacheEntryMap;
+
+        GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null;
+
+        if (entry != null && entry.markObsoleteVersion(ver))
+            removeEntry(entry);
+    }
+
+    /**
      *
      */
     public void cleanupRemoveQueue() {
@@ -316,10 +430,10 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             RemovedEntryHolder item = rmvQueue.pollFirst();
 
             if (item != null)
-                cctx.dht().removeVersionedEntry(item.key(), item.version());
+                removeVersionedEntry(item.cacheId(), item.key(), 
item.version());
         }
 
-        if (!cctx.isDrEnabled()) {
+        if (!grp.isDrEnabled()) {
             RemovedEntryHolder item = rmvQueue.peekFirst();
 
             while (item != null && item.expireTime() < U.currentTimeMillis()) {
@@ -328,7 +442,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                 if (item == null)
                     break;
 
-                cctx.dht().removeVersionedEntry(item.key(), item.version());
+                removeVersionedEntry(item.cacheId(), item.key(), 
item.version());
 
                 item = rmvQueue.peekFirst();
             }
@@ -336,13 +450,14 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param cacheId cacheId Cache ID.
      * @param key Removed key.
      * @param ver Removed version.
      */
-    public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) {
+    public void onDeferredDelete(int cacheId, KeyCacheObject key, 
GridCacheVersion ver) {
         cleanupRemoveQueue();
 
-        rmvQueue.add(new RemovedEntryHolder(key, ver, rmvdEntryTtl));
+        rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl));
     }
 
     /**
@@ -434,7 +549,10 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override protected void release(int sizeChange, GridCacheEntryEx e) {
+    @Override protected void release(int sizeChange, CacheMapHolder hld, 
GridCacheEntryEx e) {
+        if (grp.sharedGroup() && sizeChange != 0)
+            hld.size.addAndGet(sizeChange);
+
         release0(sizeChange);
     }
 
@@ -482,16 +600,16 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * @return {@code true} if cas succeeds.
      */
     private boolean casState(long state, GridDhtPartitionState toState) {
-        if (cctx.shared().database().persistenceEnabled()) {
+        if (ctx.database().persistenceEnabled()) {
             synchronized (this) {
                 boolean update = this.state.compareAndSet(state, 
setPartState(state, toState));
 
                 if (update)
                     try {
-                        cctx.shared().wal().log(new 
PartitionMetaStateRecord(cctx.cacheId(), id, toState, updateCounter()));
+                        ctx.wal().log(new 
PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter()));
                     }
                     catch (IgniteCheckedException e) {
-                        log.error("Error while writing to log", e);
+                        U.error(log, "Error while writing to log", e);
                     }
 
                 return update;
@@ -624,7 +742,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
         GridDhtPartitionState partState = getPartState(state);
 
-        if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && 
getSize(state) == 0 &&
+        if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 &&
             partState == RENTING && getReservations(state) == 0 && 
!groupReserved() &&
             casState(state, EVICTED)) {
             if (log.isDebugEnabled())
@@ -634,7 +752,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                 finishDestroy(updateSeq);
         }
         else if (partState == RENTING || shouldBeRenting())
-            cctx.preloader().evictPartitionAsync(this);
+            grp.preloader().evictPartitionAsync(this);
     }
 
     /**
@@ -710,18 +828,13 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         assert state() == EVICTED : this;
         assert evictGuard.get() == -1;
 
-        if (cctx.isDrEnabled())
-            cctx.dr().partitionEvicted(id);
-
-        cctx.continuousQueries().onPartitionEvicted(id);
-
-        cctx.dataStructures().onPartitionEvicted(id);
+        grp.onPartitionEvicted(id);
 
         destroyCacheDataStore();
 
         rent.onDone();
 
-        ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, 
updateSeq);
+        ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, 
updateSeq);
 
         clearDeferredDeletes();
     }
@@ -759,7 +872,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      */
     private void destroyCacheDataStore() {
         try {
-            cctx.offheap().destroyCacheDataStore(dataStore());
+            grp.offheap().destroyCacheDataStore(dataStore());
         }
         catch (IgniteCheckedException e) {
             log.error("Unable to destroy cache data store on partition 
eviction [id=" + id + "]", e);
@@ -778,7 +891,9 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * @return {@code True} if local node is primary for this partition.
      */
     public boolean primary(AffinityTopologyVersion topVer) {
-        return cctx.affinity().primaryByPartition(cctx.localNode(), id, 
topVer);
+        List<ClusterNode> nodes = 
grp.affinity().cachedAffinity(topVer).get(id);
+
+        return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0));
     }
 
     /**
@@ -786,14 +901,23 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * @return {@code True} if local node is backup for this partition.
      */
     public boolean backup(AffinityTopologyVersion topVer) {
-        return cctx.affinity().backupByPartition(cctx.localNode(), id, topVer);
+        List<ClusterNode> nodes = 
grp.affinity().cachedAffinity(topVer).get(id);
+
+        return nodes.indexOf(ctx.localNode()) > 0;
     }
 
     /**
+     * @param cacheId ID of cache initiated counter update.
+     * @param topVer Topology version for current operation.
      * @return Next update index.
      */
-    public long nextUpdateCounter() {
-        return store.nextUpdateCounter();
+    long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, 
boolean primary, @Nullable Long primaryCntr) {
+        long nextCntr = store.nextUpdateCounter();
+
+        if (grp.sharedGroup())
+            grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? 
primaryCntr : nextCntr, topVer, primary);
+
+        return nextCntr;
     }
 
     /**
@@ -830,40 +954,128 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * @throws NodeStoppingException If node stopping.
      */
     public void clearAll() throws NodeStoppingException {
-        GridCacheVersion clearVer = cctx.versions().next();
+        GridCacheVersion clearVer = ctx.versions().next();
+
+        GridCacheObsoleteEntryExtras extras = new 
GridCacheObsoleteEntryExtras(clearVer);
+
+        boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
 
-        boolean rec = 
cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
+        if (grp.sharedGroup()) {
+            for (CacheMapHolder hld : cacheMaps.values())
+                clear(hld.map, extras, rec);
+        }
+        else
+            clear(singleCacheEntryMap.map, extras, rec);
 
-        Iterator<GridCacheMapEntry> it = allEntries().iterator();
+        if (!grp.allowFastEviction()) {
+            CacheMapHolder hld = grp.sharedGroup() ? null : 
singleCacheEntryMap;
 
-        GridCacheObsoleteEntryExtras extras = new 
GridCacheObsoleteEntryExtras(clearVer);
+            try {
+                GridIterator<CacheDataRow> it0 = 
grp.offheap().partitionIterator(id);
+
+                while (it0.hasNext()) {
+                    ctx.database().checkpointReadLock();
+
+                    try {
+                        CacheDataRow row = it0.next();
+
+                        if (grp.sharedGroup() && (hld == null || 
hld.cctx.cacheId() != row.cacheId())) {
+                            hld = cacheMaps.get(row.cacheId());
+
+                            if (hld == null)
+                                continue;
+                        }
+
+                        assert hld != null;
+
+                        GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
+                            hld,
+                            hld.cctx,
+                            grp.affinity().lastVersion(),
+                            row.key(),
+                            true,
+                            false);
+
+                        ctx.database().checkpointReadLock();
+
+                        try {if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+                            if (rec) {
+                                hld.cctx.events().addEvent(cached.partition(),
+                                    cached.key(),
+                                    ctx.localNodeId(),
+                                    (IgniteUuid)null,
+                                    null,
+                                    EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                    null,
+                                    false,
+                                    cached.rawGet(),
+                                    cached.hasValue(),
+                                    null,
+                                    null,
+                                    null,
+                                    false);}
+                            }
+                        }
+                        finally {
+                            ctx.database().checkpointReadUnlock();
+                        }
+                    }
+                    catch (GridDhtInvalidPartitionException e) {
+                        assert isEmpty() && state() == EVICTED : "Invalid 
error [e=" + e + ", part=" + this + ']';
+
+                        break; // Partition is already concurrently cleared 
and evicted.
+                    }
+                    finally {
+                        ctx.database().checkpointReadUnlock();
+                    }
+                }
+            }
+            catch (NodeStoppingException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to get iterator for evicted partition: " 
+ id);
+
+                rent.onDone(e);
+
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to get iterator for evicted partition: " 
+ id, e);
+            }
+        }
+    }
+
+    /**
+     * @param map Map to clear.
+     * @param extras Obsolete extras.
+     * @param evt Unload event flag.
+     * @throws NodeStoppingException
+     */
+    private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
+        GridCacheObsoleteEntryExtras extras,
+        boolean evt) throws NodeStoppingException {
+        Iterator<GridCacheMapEntry> it = map.values().iterator();
 
         while (it.hasNext()) {
             GridCacheMapEntry cached = null;
 
-            cctx.shared().database().checkpointReadLock();
+            ctx.database().checkpointReadLock();
 
             try {
                 cached = it.next();
 
-                if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+                if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) {
                     removeEntry(cached);
 
                     if (!cached.isInternal()) {
-                        if (rec) {
-                            cctx.events().addEvent(cached.partition(),
+                        if (evt) {
+                            grp.addCacheEvent(cached.partition(),
                                 cached.key(),
-                                cctx.localNodeId(),
-                                (IgniteUuid)null,
-                                null,
+                                ctx.localNodeId(),
                                 EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
                                 null,
                                 false,
                                 cached.rawGet(),
                                 cached.hasValue(),
-                                null,
-                                null,
-                                null,
                                 false);
                         }
                     }
@@ -886,66 +1098,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                 U.error(log, "Failed to clear cache entry for evicted 
partition: " + cached, e);
             }
             finally {
-                cctx.shared().database().checkpointReadUnlock();
-            }
-        }
-
-        if (!cctx.allowFastEviction()) {
-            try {
-                GridIterator<CacheDataRow> it0 = cctx.offheap().iterator(id);
-
-                while (it0.hasNext()) {
-                    try {
-                        CacheDataRow row = it0.next();
-
-                        GridCacheMapEntry cached = 
putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(),
-                            row.key(),
-                            true,
-                            false);
-
-                        cctx.shared().database().checkpointReadLock();
-
-                        try {
-                            if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
-                                if (rec) {
-                                    cctx.events().addEvent(cached.partition(),
-                                        cached.key(),
-                                        cctx.localNodeId(),
-                                        (IgniteUuid)null,
-                                        null,
-                                        EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-                                        null,
-                                        false,
-                                        cached.rawGet(),
-                                        cached.hasValue(),
-                                        null,
-                                        null,
-                                        null,
-                                        false);
-                                }
-                            }
-                        }
-                        finally {
-                            cctx.shared().database().checkpointReadUnlock();
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException e) {
-                        assert isEmpty() && state() == EVICTED : "Invalid 
error [e=" + e + ", part=" + this + ']';
-
-                        break; // Partition is already concurrently cleared 
and evicted.
-                    }
-                }
-            }
-            catch (NodeStoppingException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get iterator for evicted partition: " 
+ id);
-
-                rent.onDone(e);
-
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to get iterator for evicted partition: " 
+ id, e);
+                ctx.database().checkpointReadUnlock();
             }
         }
     }
@@ -955,7 +1108,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      */
     private void clearDeferredDeletes() {
         for (RemovedEntryHolder e : rmvQueue)
-            cctx.dht().removeVersionedEntry(e.key(), e.version());
+            removeVersionedEntry(e.cacheId(), e.key(), e.version());
     }
 
     /** {@inheritDoc} */
@@ -980,6 +1133,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtLocalPartition.class, this,
+            "grp", grp.cacheOrGroupName(),
             "state", state(),
             "reservations", reservations(),
             "empty", isEmpty(),
@@ -987,12 +1141,25 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override public int publicSize() {
+    @Override public int publicSize(int cacheId) {
+        if (grp.sharedGroup()) {
+            CacheMapHolder hld = cacheMaps.get(cacheId);
+
+            return hld != null ? hld.size.get() : 0;
+        }
+
         return getSize(state.get());
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+    @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, 
GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context());
+
+            hld.size.incrementAndGet();
+        }
+
         while (true) {
             long state = this.state.get();
 
@@ -1002,7 +1169,14 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+    @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, 
GridCacheEntryEx e) {
+        if (grp.sharedGroup()) {
+            if (hld == null)
+                hld = cacheMapHolder(e.context());
+
+            hld.size.decrementAndGet();
+        }
+
         while (true) {
             long state = this.state.get();
 
@@ -1014,6 +1188,22 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * @param cacheId Cache ID.
+     */
+    void onCacheStopped(int cacheId) {
+        assert grp.sharedGroup() : grp.cacheOrGroupName();
+
+        for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); 
it.hasNext();) {
+            RemovedEntryHolder e = it.next();
+
+            if (e.cacheId() == cacheId)
+                it.remove();
+        }
+
+        cacheMaps.remove(cacheId);
+    }
+
+    /**
      * @param state Composite state.
      * @return Partition state.
      */
@@ -1068,6 +1258,9 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * Removed entry holder.
      */
     private static class RemovedEntryHolder {
+        /** */
+        private final int cacheId;
+
         /** Cache key */
         private final KeyCacheObject key;
 
@@ -1078,11 +1271,13 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         private final long expireTime;
 
         /**
+         * @param cacheId Cache ID.
          * @param key Key.
          * @param ver Entry version.
          * @param ttl TTL.
          */
-        private RemovedEntryHolder(KeyCacheObject key, GridCacheVersion ver, 
long ttl) {
+        private RemovedEntryHolder(int cacheId, KeyCacheObject key, 
GridCacheVersion ver, long ttl) {
+            this.cacheId = cacheId;
             this.key = key;
             this.ver = ver;
 
@@ -1090,6 +1285,13 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         }
 
         /**
+         * @return Cache ID.
+         */
+        int cacheId() {
+            return cacheId;
+        }
+
+        /**
          * @return Key.
          */
         KeyCacheObject key() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
index ea6ca06..87abd6c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
@@ -174,7 +174,7 @@ public class GridDhtLockResponse extends 
GridDistributedLockResponse {
         }
 
         if (preloadEntries != null)
-            marshalInfos(preloadEntries, cctx);
+            marshalInfos(preloadEntries, cctx.shared(), 
cctx.cacheObjectContext());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c6715e5..4e0608d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -88,9 +88,9 @@ public interface GridDhtPartitionTopology {
     public boolean stopping();
 
     /**
-     * @return Cache ID.
+     * @return Cache group ID.
      */
-    public int cacheId();
+    public int groupId();
 
     /**
      * Pre-initializes this topology.
@@ -146,13 +146,12 @@ public interface GridDhtPartitionTopology {
     public void releasePartitions(int... parts);
 
     /**
-     * @param key Cache key.
-     * @param create If {@code true}, then partition will be created if it's 
not there.
+     * @param part Partition number.
      * @return Local partition.
      * @throws GridDhtInvalidPartitionException If partition is evicted or 
absent and
      *      does not belong to this node.
      */
-    @Nullable public GridDhtLocalPartition localPartition(Object key, boolean 
create)
+    @Nullable public GridDhtLocalPartition localPartition(int part)
         throws GridDhtInvalidPartitionException;
 
     /**

Reply via email to