http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d3042e9..88dc863 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -57,6 +57,8 @@ import 
org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDisc
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.ClusterState;
@@ -198,8 +200,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
 
-    /** Cache validation results. */
-    private volatile Map<Integer, CacheValidation> cacheValidRes;
+    /** Cache groups validation results. */
+    private volatile Map<Integer, CacheValidation> grpValidRes;
 
     /** Skip preload flag. */
     private boolean skipPreload;
@@ -237,6 +239,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     /** */
     private volatile IgniteDhtPartitionsToReloadMap partsToReload = new 
IgniteDhtPartitionsToReloadMap();
 
+    /** */
     private final AtomicBoolean done = new AtomicBoolean();
 
     /**
@@ -404,8 +407,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param cacheId Cache ID to check.
-     * @param rcvdFrom Topology version.
+     * @param cacheId Cache ID.
+     * @param rcvdFrom Node ID cache was received from.
      * @return {@code True} if cache was added during this exchange.
      */
     public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
@@ -413,14 +416,32 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param grpId Cache group ID.
+     * @param rcvdFrom Node ID cache group was received from.
+     * @return {@code True} if cache group was added during this exchange.
+     */
+    public boolean cacheGroupAddedOnExchange(int grpId, UUID rcvdFrom) {
+        return dynamicCacheGroupStarted(grpId) ||
+            (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
+    }
+
+    /**
      * @param cacheId Cache ID.
      * @return {@code True} if non-client cache was added during this exchange.
      */
-    public boolean dynamicCacheStarted(int cacheId) {
+    private boolean dynamicCacheStarted(int cacheId) {
         return exchActions != null && exchActions.cacheStarted(cacheId);
     }
 
     /**
+     * @param grpId Cache group ID.
+     * @return {@code True} if non-client cache group was added during this 
exchange.
+     */
+    public boolean dynamicCacheGroupStarted(int grpId) {
+        return exchActions != null && exchActions.cacheGroupStarting(grpId);
+    }
+
+    /**
      * @return {@code True}
      */
     public boolean onAdded() {
@@ -550,8 +571,24 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                             exchActions = new ExchangeActions();
 
+                            Set<String> cacheNames = new HashSet<>();
+
+                            Collection<CacheGroupDescriptor> grpDescs = new 
ArrayList<>();
+
+                            for (Integer grpId : op.cacheGroupIds()) {
+                                CacheGroupContext cacheGrp = 
cctx.cache().cacheGroup(grpId);
+
+                                if (cacheGrp == null)
+                                    continue;
+
+                                
grpDescs.add(cctx.cache().cacheGroupDescriptors().get(grpId));
+
+                                for (Integer cacheId : cacheGrp.cacheIds())
+                                    
cacheNames.add(cctx.cacheContext(cacheId).name());
+                            }
+
                             List<DynamicCacheChangeRequest> destroyRequests = 
getStopCacheRequests(
-                                cctx.cache(), op.cacheNames(), 
cctx.localNodeId());
+                                cctx.cache(), cacheNames, cctx.localNodeId());
 
                             if (!F.isEmpty(destroyRequests)) { //Emulate 
destroy cache request
                                 for (DynamicCacheChangeRequest req : 
destroyRequests) {
@@ -559,6 +596,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                                         
.cacheDescriptor(CU.cacheId(req.cacheName())));
                                 }
 
+                                for (CacheGroupDescriptor grpDesc : grpDescs)
+                                    exchActions.addCacheGroupToStop(grpDesc);
+
                                 if (op.type() == SnapshotOperationType.RESTORE)
                                     cctx.cache().onCustomEvent(new 
DynamicCacheChangeBatch(destroyRequests), topVer);
 
@@ -688,11 +728,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
         try {
             if (crd != null) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (cacheCtx.isLocal())
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (grp.isLocal())
                         continue;
 
-                    cacheCtx.topology().beforeExchange(this, !centralizedAff);
+                    grp.topology().beforeExchange(this, !centralizedAff);
                 }
             }
         }
@@ -706,28 +746,28 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void updateTopologies(boolean crd) throws IgniteCheckedException {
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            GridClientPartitionTopology clientTop = 
cctx.exchange().clearClientTopology(cacheCtx.cacheId());
+            GridClientPartitionTopology clientTop = 
cctx.exchange().clearClientTopology(grp.groupId());
 
             long updSeq = clientTop == null ? -1 : 
clientTop.lastUpdateSequence();
 
-            GridDhtPartitionTopology top = cacheCtx.topology();
+            GridDhtPartitionTopology top = grp.topology();
 
             if (crd) {
-                boolean updateTop = 
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                boolean updateTop = 
exchId.topologyVersion().equals(grp.localStartVersion());
 
                 if (updateTop && clientTop != null)
                     top.update(this, clientTop.partitionMap(true), 
clientTop.updateCounters(false), Collections.<Integer>emptySet());
             }
 
-            top.updateTopologyVersion(exchId, this, updSeq, 
stopping(cacheCtx.cacheId()));
+            top.updateTopologyVersion(exchId, this, updSeq, 
cacheGroupStopping(grp.groupId()));
         }
 
         for (GridClientPartitionTopology top : 
cctx.exchange().clientTopologies())
-            top.updateTopologyVersion(exchId, this, -1, 
stopping(top.cacheId()));
+            top.updateTopologyVersion(exchId, this, -1, 
cacheGroupStopping(top.groupId()));
     }
 
     /**
@@ -821,15 +861,19 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
         if (crd != null) {
             if (crd.isLocal()) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    boolean updateTop = !cacheCtx.isLocal() &&
-                        
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    boolean updateTop = !grp.isLocal() &&
+                        
exchId.topologyVersion().equals(grp.localStartVersion());
 
                     if (updateTop) {
                         for (GridClientPartitionTopology top : 
cctx.exchange().clientTopologies()) {
-                            if (top.cacheId() == cacheCtx.cacheId()) {
-                                cacheCtx.topology().update(this,
-                                    top.partitionMap(true),
+                            if (top.groupId() == grp.groupId()) {
+                                GridDhtPartitionFullMap fullMap = 
top.partitionMap(true);
+
+                                assert fullMap != null;
+
+                                grp.topology().update(this,
+                                    fullMap,
                                     top.updateCounters(false),
                                     Collections.<Integer>emptySet());
 
@@ -850,8 +894,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         }
         else {
             if (centralizedAff) { // Last server node failed.
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    GridAffinityAssignmentCache aff = 
cacheCtx.affinity().affinityCache();
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    GridAffinityAssignmentCache aff = grp.affinity();
 
                     aff.initialize(topologyVersion(), aff.idealAssignment());
                 }
@@ -869,11 +913,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
         assert !cctx.kernalContext().clientNode();
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            cacheCtx.preloader().onTopologyChanged(this);
+            grp.preloader().onTopologyChanged(this);
         }
 
         cctx.database().releaseHistoryForPreloading();
@@ -886,15 +930,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || 
affChangeMsg != null;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
+            if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId()))
                 continue;
 
             if (topChanged) {
                 // Partition release future is done so we can flush the 
write-behind store.
                 cacheCtx.store().forceFlush();
             }
+        }
+
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                continue;
 
-            cacheCtx.topology().beforeExchange(this, !centralizedAff);
+            grp.topology().beforeExchange(this, !centralizedAff);
         }
 
         cctx.database().beforeExchange(this);
@@ -1018,11 +1067,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      *
      */
     private void onLeft() {
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            cacheCtx.preloader().unwindUndeploys();
+            grp.preloader().unwindUndeploys();
         }
 
         cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), 
exchId.topologyVersion());
@@ -1034,17 +1083,17 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private void warnNoAffinityNodes() {
         List<String> cachesWithoutNodes = null;
 
-        for (String name : cctx.cache().cacheNames()) {
-            if (discoCache.cacheAffinityNodes(name).isEmpty()) {
+        for (DynamicCacheDescriptor cacheDesc : 
cctx.cache().cacheDescriptors().values()) {
+            if 
(discoCache.cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) {
                 if (cachesWithoutNodes == null)
                     cachesWithoutNodes = new ArrayList<>();
 
-                cachesWithoutNodes.add(name);
+                cachesWithoutNodes.add(cacheDesc.cacheName());
 
                 // Fire event even if there is no client cache started.
                 if 
(cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
                     Event evt = new CacheEvent(
-                        name,
+                        cacheDesc.cacheName(),
                         cctx.localNode(),
                         cctx.localNode(),
                         "All server nodes have left the cluster.",
@@ -1106,10 +1155,18 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param grpId Cache group ID to check.
+     * @return {@code True} if cache group us stopping by this exchange.
+     */
+    private boolean cacheGroupStopping(int grpId) {
+        return exchActions != null && exchActions.cacheGroupStopping(grpId);
+    }
+
+    /**
      * @param cacheId Cache ID to check.
      * @return {@code True} if cache is stopping by this exchange.
      */
-    public boolean stopping(int cacheId) {
+    private boolean cacheStopping(int cacheId) {
         return exchActions != null && exchActions.cacheStopped(cacheId);
     }
 
@@ -1235,18 +1292,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
        }
 
         if (err == null && realExchange) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
                 try {
                     if (centralizedAff)
-                        cacheCtx.topology().initPartitions(this);
+                        grp.topology().initPartitions(this);
                 }
                 catch (IgniteInterruptedCheckedException e) {
                     U.error(log, "Failed to initialize partitions.", e);
                 }
+            }
 
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 GridCacheContext drCacheCtx = cacheCtx.isNear() ? 
cacheCtx.near().dht().context() : cacheCtx;
 
                 if (drCacheCtx.isDrEnabled()) {
@@ -1264,21 +1323,21 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                 discoEvt.type() == EVT_NODE_JOINED)
                 detectLostPartitions();
 
-            Map<Integer, CacheValidation> m = new 
HashMap<>(cctx.cacheContexts().size());
+            Map<Integer, CacheValidation> m = new 
HashMap<>(cctx.cache().cacheGroups().size());
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                Collection<Integer> lostParts = cacheCtx.isLocal() ?
-                    Collections.<Integer>emptyList() : 
cacheCtx.topology().lostPartitions();
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                Collection<Integer> lostParts = grp.isLocal() ?
+                    Collections.<Integer>emptyList() : 
grp.topology().lostPartitions();
 
                 boolean valid = true;
 
-                if (cacheCtx.config().getTopologyValidator() != null && 
!CU.isSystemCache(cacheCtx.name()))
-                    valid = 
cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes());
+                if (grp.topologyValidator() != null && !grp.systemCache())
+                    valid = 
grp.topologyValidator().validate(discoEvt.topologyNodes());
 
-                m.put(cacheCtx.cacheId(), new CacheValidation(valid, 
lostParts));
+                m.put(grp.groupId(), new CacheValidation(valid, lostParts));
             }
 
-            cacheValidRes = m;
+            grpValidRes = m;
         }
 
         cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, 
err, false);
@@ -1329,8 +1388,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             initFut.onDone(err == null);
 
             if (exchId.isLeft()) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                    
cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
+                for (CacheGroupContext grp : cctx.cache().cacheGroups())
+                    grp.affinityFunction().removeNode(exchId.nodeId());
             }
 
             exchActions = null;
@@ -1379,16 +1438,18 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             return new CacheInvalidStateException(
                 "Failed to perform cache operation (cluster is not activated): 
" + cctx.name());
 
-        PartitionLossPolicy partLossPlc = 
cctx.config().getPartitionLossPolicy();
+        CacheGroupContext grp = cctx.group();
 
-        if (cctx.needsRecovery() && !recovery) {
+        PartitionLossPolicy partLossPlc = 
grp.config().getPartitionLossPolicy();
+
+        if (grp.needsRecovery() && !recovery) {
             if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == 
READ_ONLY_ALL))
                 return new IgniteCheckedException("Failed to write to cache 
(cache is moved to a read-only state): " +
                     cctx.name());
         }
 
-        if (cctx.needsRecovery() || cctx.config().getTopologyValidator() != 
null) {
-            CacheValidation validation = cacheValidRes.get(cctx.cacheId());
+        if (grp.needsRecovery() || grp.topologyValidator() != null) {
+            CacheValidation validation = grpValidRes.get(grp.groupId());
 
             if (validation == null)
                 return null;
@@ -1397,7 +1458,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                 return new IgniteCheckedException("Failed to perform cache 
operation " +
                     "(cache topology is not valid): " + cctx.name());
 
-            if (recovery || !cctx.needsRecovery())
+            if (recovery || !grp.needsRecovery())
                 return null;
 
             if (key != null) {
@@ -1629,9 +1690,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         Map<Integer, Long> minCntrs = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
msgs.entrySet()) {
-            assert e.getValue().partitionUpdateCounters(top.cacheId()) != null;
+            assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
 
-            for (Map.Entry<Integer, T2<Long, Long>> e0 : 
e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) {
+            for (Map.Entry<Integer, T2<Long, Long>> e0 : 
e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
                 int p = e0.getKey();
 
                 UUID uuid = e.getKey();
@@ -1702,7 +1763,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
         Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
-        Map<Integer, Long> localReserved = partHistReserved0 != null ? 
partHistReserved0.get(top.cacheId()) : null;
+        Map<Integer, Long> localReserved = partHistReserved0 != null ? 
partHistReserved0.get(top.groupId()) : null;
 
         Set<Integer> haveHistory = new HashSet<>();
 
@@ -1723,7 +1784,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                 if (localCntr != null && localCntr <= minCntr &&
                     maxCntrObj.nodes.contains(cctx.localNodeId())) {
-                    partHistSuppliers.put(cctx.localNodeId(), top.cacheId(), 
p, minCntr);
+                    partHistSuppliers.put(cctx.localNodeId(), top.groupId(), 
p, minCntr);
 
                     haveHistory.add(p);
 
@@ -1732,10 +1793,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             }
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : 
msgs.entrySet()) {
-                Long histCntr = 
e0.getValue().partitionHistoryCounters(top.cacheId()).get(p);
+                Long histCntr = 
e0.getValue().partitionHistoryCounters(top.groupId()).get(p);
 
                 if (histCntr != null && histCntr <= minCntr && 
maxCntrObj.nodes.contains(e0.getKey())) {
-                    partHistSuppliers.put(e0.getKey(), top.cacheId(), p, 
minCntr);
+                    partHistSuppliers.put(e0.getKey(), top.groupId(), p, 
minCntr);
 
                     haveHistory.add(p);
 
@@ -1756,7 +1817,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             Set<UUID> nodesToReload = top.setOwners(p, e.getValue().nodes, 
haveHistory.contains(p), entryLeft == 0);
 
             for (UUID nodeId : nodesToReload)
-                partsToReload.put(nodeId, top.cacheId(), p);
+                partsToReload.put(nodeId, top.groupId(), p);
         }
     }
 
@@ -1768,24 +1829,32 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             if (Thread.currentThread().isInterrupted())
                 return;
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.isLocal())
-                    cacheCtx.topology().detectLostPartitions(discoEvt);
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (!grp.isLocal())
+                    grp.topology().detectLostPartitions(discoEvt);
             }
         }
     }
 
     /**
-     *
+     * @param cacheNames Cache names.
      */
     private void resetLostPartitions(Collection<String> cacheNames) {
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.isLocal() && 
cacheNames.contains(cacheCtx.name()))
-                    cacheCtx.topology().resetLostPartitions();
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
+                    continue;
+
+                for (String cacheName : cacheNames) {
+                    if (grp.hasCache(cacheName)) {
+                        grp.topology().resetLostPartitions();
+
+                        break;
+                    }
+                }
             }
         }
     }
@@ -1800,9 +1869,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             assert partHistSuppliers.isEmpty();
 
             if (!crd.equals(discoCache.serverNodes().get(0))) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (!cacheCtx.isLocal())
-                        cacheCtx.topology().beforeExchange(this, 
!centralizedAff);
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (!grp.isLocal())
+                        grp.topology().beforeExchange(this, !centralizedAff);
                 }
             }
 
@@ -1811,13 +1880,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                     GridDhtPartitionsSingleMessage msg0 = 
(GridDhtPartitionsSingleMessage)msg;
 
                     for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg0.partitions().entrySet()) {
-                        Integer cacheId = entry.getKey();
-                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                        Integer grpId = entry.getKey();
+                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                        GridDhtPartitionTopology top = cacheCtx != null ? 
cacheCtx.topology() :
-                            cctx.exchange().clientTopology(cacheId, this);
+                        GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
+                            cctx.exchange().clientTopology(grpId, this);
 
-                        Map<Integer, T2<Long, Long>> cntrs = 
msg0.partitionUpdateCounters(cacheId);
+                        Map<Integer, T2<Long, Long>> cntrs = 
msg0.partitionUpdateCounters(grpId);
 
                         if (cntrs != null)
                             top.applyUpdateCounters(cntrs);
@@ -1895,11 +1964,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      */
     private void assignPartitionsStates() {
         if (cctx.database().persistenceEnabled()) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                assignPartitionStates(cacheCtx.topology());
+                assignPartitionStates(grp.topology());
             }
         }
     }
@@ -2028,20 +2097,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         partHistSuppliers.putAll(msg.partitionHistorySuppliers());
 
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
msg.partitions().entrySet()) {
-            Integer cacheId = entry.getKey();
+            Integer grpId = entry.getKey();
 
-            Map<Integer, T2<Long, Long>> cntrMap = 
msg.partitionUpdateCounters(cacheId);
+            Map<Integer, T2<Long, Long>> cntrMap = 
msg.partitionUpdateCounters(grpId);
 
-            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-            if (cacheCtx != null)
-                cacheCtx.topology().update(this, entry.getValue(), cntrMap,
-                    msg.partsToReload(cctx.localNodeId(), cacheId));
+            if (grp != null)
+                grp.topology().update(this, entry.getValue(), cntrMap,
+                    msg.partsToReload(cctx.localNodeId(), grpId));
             else {
                 ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
-                    cctx.exchange().clientTopology(cacheId, this).update(this, 
entry.getValue(), cntrMap, Collections.<Integer>emptySet());
+                    cctx.exchange().clientTopology(grpId, this).update(this, 
entry.getValue(), cntrMap, Collections.<Integer>emptySet());
             }
         }
     }
@@ -2055,11 +2124,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         msgs.put(node.id(), msg);
 
         for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg.partitions().entrySet()) {
-            Integer cacheId = entry.getKey();
-            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            Integer grpId = entry.getKey();
+            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-            GridDhtPartitionTopology top = cacheCtx != null ? 
cacheCtx.topology() :
-                cctx.exchange().clientTopology(cacheId, this);
+            GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                cctx.exchange().clientTopology(grpId, this);
 
             top.update(exchId, entry.getValue());
         }
@@ -2211,13 +2280,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                             List<ClusterNode> empty = Collections.emptyList();
 
-                            for (GridCacheContext cacheCtx : 
cctx.cacheContexts()) {
-                                List<List<ClusterNode>> affAssignment = new 
ArrayList<>(cacheCtx.affinity().partitions());
+                            for (CacheGroupContext grp : 
cctx.cache().cacheGroups()) {
+                                List<List<ClusterNode>> affAssignment = new 
ArrayList<>(grp.affinity().partitions());
 
-                                for (int i = 0; i < 
cacheCtx.affinity().partitions(); i++)
+                                for (int i = 0; i < 
grp.affinity().partitions(); i++)
                                     affAssignment.add(empty);
 
-                                
cacheCtx.affinity().affinityCache().initialize(topologyVersion(), 
affAssignment);
+                                grp.affinity().initialize(topologyVersion(), 
affAssignment);
                             }
 
                             onDone(topologyVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 94ad21e..2f8a531 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -125,6 +125,11 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         this.partsToReload = partsToReload;
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
     /**
      * @param compress {@code True} if it is possible to use compression for 
message.
      */
@@ -143,24 +148,26 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return {@code True} if message contains full map for given cache.
      */
-    public boolean containsCache(int cacheId) {
-        return parts != null && parts.containsKey(cacheId);
+    public boolean containsGroup(int grpId) {
+        return parts != null && parts.containsKey(grpId);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param fullMap Full partitions map.
      * @param dupDataCache Optional ID of cache with the same partition state 
map.
      */
-    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap 
fullMap, @Nullable Integer dupDataCache) {
+    public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap 
fullMap, @Nullable Integer dupDataCache) {
+        assert fullMap != null;
+
         if (parts == null)
             parts = new HashMap<>();
 
-        if (!parts.containsKey(cacheId)) {
-            parts.put(cacheId, fullMap);
+        if (!parts.containsKey(grpId)) {
+            parts.put(grpId, fullMap);
 
             if (dupDataCache != null) {
                 assert compress;
@@ -169,30 +176,29 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 if (dupPartsData == null)
                     dupPartsData = new HashMap<>();
 
-                dupPartsData.put(cacheId, dupDataCache);
+                dupPartsData.put(grpId, dupDataCache);
             }
         }
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, 
Long>> cntrMap) {
+    public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, 
Long>> cntrMap) {
         if (partCntrs == null)
             partCntrs = new IgniteDhtPartitionCountersMap();
 
-        partCntrs.putIfAbsent(cacheId, cntrMap);
+        partCntrs.putIfAbsent(grpId, cntrMap);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
cacheId) {
-        if (partCntrs != null) {
-            return partCntrs.get(cacheId);
-        }
+    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
grpId) {
+        if (partCntrs != null)
+            return partCntrs.get(grpId);
 
         return Collections.emptyMap();
     }
@@ -207,11 +213,11 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         return partHistSuppliers;
     }
 
-    public Set<Integer> partsToReload(UUID nodeId, int cacheId) {
+    public Set<Integer> partsToReload(UUID nodeId, int grpId) {
         if (partsToReload == null)
             return Collections.emptySet();
 
-        return partsToReload.get(nodeId, cacheId);
+        return partsToReload.get(nodeId, grpId);
     }
 
     /**
@@ -396,43 +402,43 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         }
 
         switch (writer.state()) {
-            case 6:
+            case 5:
                 if (!writer.writeMap("dupPartsData", dupPartsData, 
MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeByteArray("exsBytes", exsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeByteArray("partHistSuppliersBytes", 
partHistSuppliersBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 10:
                 if (!writer.writeByteArray("partsToReloadBytes", 
partsToReloadBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 12:
+            case 11:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -454,7 +460,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             return false;
 
         switch (reader.state()) {
-            case 6:
+            case 5:
                 dupPartsData = reader.readMap("dupPartsData", 
MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
@@ -462,7 +468,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 exsBytes = reader.readByteArray("exsBytes");
 
                 if (!reader.isLastRead())
@@ -470,7 +476,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -478,7 +484,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 partHistSuppliersBytes = 
reader.readByteArray("partHistSuppliersBytes");
 
                 if (!reader.isLastRead())
@@ -486,7 +492,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -494,7 +500,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 11:
+            case 10:
                 partsToReloadBytes = 
reader.readByteArray("partsToReloadBytes");
 
                 if (!reader.isLastRead())
@@ -502,7 +508,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
                 reader.incrementState();
 
-            case 12:
+            case 11:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -520,11 +526,9 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         return 46;
     }
 
-    //todo
-
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 9222251..1e5ea14 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -111,6 +111,11 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         this.compress = compress;
     }
 
+    /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
     /**
      * @return {@code True} if sent from client node.
      */
@@ -142,23 +147,23 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void partitionUpdateCounters(int cacheId, Map<Integer, T2<Long, 
Long>> cntrMap) {
+    public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, 
Long>> cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
-        partCntrs.put(cacheId, cntrMap);
+        partCntrs.put(grpId, cntrMap);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
cacheId) {
+    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
grpId) {
         if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
+            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
 
             return res != null ? res : Collections.<Integer, T2<Long, 
Long>>emptyMap();
         }
@@ -167,35 +172,34 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cntrMap Partition history counters.
      */
-    public void partitionHistoryCounters(int cacheId, Map<Integer, Long> 
cntrMap) {
+    public void partitionHistoryCounters(int grpId, Map<Integer, Long> 
cntrMap) {
         if (cntrMap.isEmpty())
             return;
 
         if (partHistCntrs == null)
             partHistCntrs = new HashMap<>();
 
-        partHistCntrs.put(cacheId, cntrMap);
+        partHistCntrs.put(grpId, cntrMap);
     }
 
     /**
      * @param cntrMap Partition history counters.
      */
-    public void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> 
cntrMap) {
-        for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) {
+    void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap) {
+        for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet())
             partitionHistoryCounters(e.getKey(), e.getValue());
-        }
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Partition history counters.
      */
-    public Map<Integer, Long> partitionHistoryCounters(int cacheId) {
+    Map<Integer, Long> partitionHistoryCounters(int grpId) {
         if (partHistCntrs != null) {
-            Map<Integer, Long> res = partHistCntrs.get(cacheId);
+            Map<Integer, Long> res = partHistCntrs.get(grpId);
 
             return res != null ? res : Collections.<Integer, Long>emptyMap();
         }
@@ -351,37 +355,37 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         }
 
         switch (writer.state()) {
-            case 6:
+            case 5:
                 if (!writer.writeBoolean("client", client))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeMap("dupPartsData", dupPartsData, 
MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
-            case 8:
+            case 7:
                 if (!writer.writeByteArray("exBytes", exBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 9:
+            case 8:
                 if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 10:
+            case 9:
                 if (!writer.writeByteArray("partHistCntrsBytes", 
partHistCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
-            case 11:
+            case 10:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -403,7 +407,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
             return false;
 
         switch (reader.state()) {
-            case 6:
+            case 5:
                 client = reader.readBoolean("client");
 
                 if (!reader.isLastRead())
@@ -411,7 +415,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 dupPartsData = reader.readMap("dupPartsData", 
MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
@@ -419,7 +423,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 8:
+            case 7:
                 exBytes = reader.readByteArray("exBytes");
 
                 if (!reader.isLastRead())
@@ -427,7 +431,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 9:
+            case 8:
                 partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -435,7 +439,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 10:
+            case 9:
                 partHistCntrsBytes = 
reader.readByteArray("partHistCntrsBytes");
 
                 if (!reader.isLastRead())
@@ -443,7 +447,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
                 reader.incrementState();
 
-            case 11:
+            case 10:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -461,11 +465,9 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         return 47;
     }
 
-    //todo add ex
-
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 11;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index b4ce939..4b80ee0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -48,6 +48,11 @@ public class GridDhtPartitionsSingleRequest extends 
GridDhtPartitionsAbstractMes
     }
 
     /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
     @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
cacheId) {
         return Collections.emptyMap();
     }
@@ -89,7 +94,7 @@ public class GridDhtPartitionsSingleRequest extends 
GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 5;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 957f93b0..82eafc1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -23,30 +23,22 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -61,11 +53,8 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
@@ -77,7 +66,6 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
-import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache preloader.
@@ -89,9 +77,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** */
     private GridDhtPartitionTopology top;
 
-    /** Force key futures. */
-    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> 
forceKeyFuts = newMap();
-
     /** Partition suppliers. */
     private GridDhtPartitionSupplier supplier;
 
@@ -114,47 +99,15 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     private final AtomicInteger partsEvictOwning = new AtomicInteger();
 
     /** */
-    private volatile boolean stopping;
-
-    /** */
     private boolean stopped;
 
-    /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
-        @Override public void onEvent(Event evt) {
-            if (!enterBusy())
-                return;
-
-            DiscoveryEvent e = (DiscoveryEvent)evt;
-
-            try {
-                ClusterNode loc = cctx.localNode();
-
-                assert e.type() == EVT_NODE_JOINED || e.type() == 
EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED;
-
-                final ClusterNode n = e.eventNode();
-
-                assert !loc.id().equals(n.id());
-
-                for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
-                    f.onDiscoveryEvent(e);
-
-                assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() 
: "Node joined with smaller-than-local " +
-                    "order [newOrder=" + n.order() + ", locOrder=" + 
loc.order() + ']';
-            }
-            finally {
-                leaveBusy();
-            }
-        }
-    };
-
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
-        super(cctx);
+    public GridDhtPreloader(CacheGroupContext grp) {
+        super(grp);
 
-        top = cctx.dht().topology();
+        top = grp.topology();
 
         startFut = new GridFutureAdapter<>();
     }
@@ -164,37 +117,10 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("Starting DHT rebalancer...");
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class,
-            new MessageHandler<GridDhtForceKeysRequest>() {
-                @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysRequest msg) {
-                    processForceKeysRequest(node, msg);
-                }
-            });
-
-        cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class,
-            new MessageHandler<GridDhtForceKeysResponse>() {
-                @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysResponse msg) {
-                    processForceKeyResponse(node, msg);
-                }
-            });
-
-        if (!cctx.kernalContext().clientNode()) {
-            cctx.io().addHandler(cctx.cacheId(), 
GridDhtAffinityAssignmentRequest.class,
-                new MessageHandler<GridDhtAffinityAssignmentRequest>() {
-                    @Override protected void onMessage(ClusterNode node, 
GridDhtAffinityAssignmentRequest msg) {
-                        processAffinityAssignmentRequest(node, msg);
-                    }
-                });
-        }
-
-        cctx.shared().affinity().onCacheCreated(cctx);
-
-        supplier = new GridDhtPartitionSupplier(cctx);
-        demander = new GridDhtPartitionDemander(cctx);
+        supplier = new GridDhtPartitionSupplier(grp);
+        demander = new GridDhtPartitionDemander(grp);
 
         demander.start();
-
-        cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, 
EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
@@ -213,10 +139,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("DHT rebalancer onKernalStop callback.");
 
-        stopping = true;
-
-        cctx.events().removeListener(discoLsnr);
-
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
@@ -227,11 +149,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
             if (demander != null)
                 demander.stop();
 
-            IgniteCheckedException err = stopError();
-
-            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
-                fut.onDone(err);
-
             top = null;
 
             stopped = true;
@@ -266,25 +183,27 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments 
assign(GridDhtPartitionsExchangeFuture exchFut) {
         // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
+        GridDhtPartitionTopology top = grp.topology();
 
-        if (!cctx.rebalanceEnabled() || 
!cctx.shared().kernalContext().state().active())
+        if (!grp.rebalanceEnabled() || 
!grp.shared().kernalContext().state().active())
             return new GridDhtPreloaderAssignments(exchFut, 
top.topologyVersion());
 
-        int partCnt = cctx.affinity().partitions();
+        int partCnt = grp.affinity().partitions();
 
         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
             
exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-                ", cache=" + cctx.name() +
+                ", grp=" + grp.name() +
                 ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments assigns = new 
GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         AffinityTopologyVersion topVer = assigns.topologyVersion();
 
+        AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
         for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
+            if (ctx.exchange().hasPendingExchange()) {
                 if (log.isDebugEnabled())
                     log.debug("Skipping assignments creation, exchange worker 
has pending assignments: " +
                         exchFut.exchangeId());
@@ -295,7 +214,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
             }
 
             // If partition belongs to local node.
-            if (cctx.affinity().partitionLocalNode(p, topVer)) {
+            if (aff.get(p).contains(ctx.localNode())) {
                 GridDhtLocalPartition part = top.localPartition(p, topVer, 
true, true);
 
                 assert part != null;
@@ -303,11 +222,11 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
                 ClusterNode histSupplier = null;
 
-                if (cctx.shared().database().persistenceEnabled()) {
-                    UUID nodeId = 
exchFut.partitionHistorySupplier(cctx.cacheId(), p);
+                if (ctx.database().persistenceEnabled()) {
+                    UUID nodeId = 
exchFut.partitionHistorySupplier(grp.groupId(), p);
 
                     if (nodeId != null)
-                        histSupplier = cctx.discovery().node(nodeId);
+                        histSupplier = ctx.discovery().node(nodeId);
                 }
 
                 if (histSupplier != null) {
@@ -318,7 +237,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                         continue; // For.
                     }
 
-                    assert cctx.shared().database().persistenceEnabled();
+                    assert ctx.database().persistenceEnabled();
                     assert remoteOwners(p, topVer).contains(histSupplier) : 
remoteOwners(p, topVer);
 
                     GridDhtPartitionDemandMessage msg = 
assigns.get(histSupplier);
@@ -327,13 +246,13 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                         assigns.put(histSupplier, msg = new 
GridDhtPartitionDemandMessage(
                             top.updateSequence(),
                             exchFut.exchangeId().topologyVersion(),
-                            cctx.cacheId()));
+                            grp.groupId()));
                     }
 
                     msg.addPartition(p, true);
                 }
                 else {
-                    if (cctx.shared().database().persistenceEnabled()) {
+                    if (ctx.database().persistenceEnabled()) {
                         if (part.state() == RENTING || part.state() == 
EVICTED) {
                             try {
                                 part.rent(false).get();
@@ -360,13 +279,13 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                     if (picked.isEmpty()) {
                         top.own(part);
 
-                        if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        if 
(grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
                             DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
-                            cctx.events().addPreloadEvent(p,
-                                EVT_CACHE_REBALANCE_PART_DATA_LOST, 
discoEvt.eventNode(),
-                                discoEvt.type(), discoEvt.timestamp());
-                        }
+                        grp.addRebalanceEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST, 
discoEvt.eventNode(),
+                            discoEvt.type(), discoEvt.timestamp());
+                    }
 
                         if (log.isDebugEnabled())
                             log.debug("Owning partition as there are no other 
owners: " + part);
@@ -376,12 +295,12 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
                         GridDhtPartitionDemandMessage msg = assigns.get(n);
 
-                        if (msg == null) {
-                            assigns.put(n, msg = new 
GridDhtPartitionDemandMessage(
-                                top.updateSequence(),
-                                exchFut.exchangeId().topologyVersion(),
-                                cctx.cacheId()));
-                        }
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            exchFut.exchangeId().topologyVersion(),
+                            grp.groupId()));
+                    }
 
                         msg.addPartition(p, false);
                     }
@@ -403,7 +322,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
      * @return Picked owners.
      */
     private Collection<ClusterNode> pickedOwners(int p, 
AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, 
topVer);
+        Collection<ClusterNode> affNodes = 
grp.affinity().cachedAffinity(topVer).get(p);
 
         int affCnt = affNodes.size();
 
@@ -429,7 +348,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
      * @return Nodes owning this partition.
      */
     private Collection<ClusterNode> remoteOwners(int p, 
AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), 
F.remoteNodes(cctx.nodeId()));
+        return F.view(grp.topology().owners(p, topVer), 
F.remoteNodes(ctx.localNodeId()));
     }
 
     /** {@inheritDoc} */
@@ -468,7 +387,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public Runnable addAssignments(GridDhtPreloaderAssignments 
assignments,
         boolean forceRebalance,
-        Collection<String> caches,
         int cnt,
         Runnable next,
         @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
@@ -484,12 +402,12 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : 
demander.syncFuture();
+        return ctx.kernalContext().clientNode() ? startFut : 
demander.syncFuture();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
-        return cctx.kernalContext().clientNode() ? new 
GridFinishedFuture<>(true) : demander.rebalanceFuture();
+        return ctx.kernalContext().clientNode() ? new 
GridFinishedFuture<>(true) : demander.rebalanceFuture();
     }
 
     /**
@@ -516,171 +434,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /**
-     * @param node Node originated request.
-     * @param msg Force keys message.
-     */
-    private void processForceKeysRequest(final ClusterNode node, final 
GridDhtForceKeysRequest msg) {
-        IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), 
msg.cacheId(), msg.topologyVersion());
-
-        if (fut.isDone())
-            processForceKeysRequest0(node, msg);
-        else
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> t) {
-                    processForceKeysRequest0(node, msg);
-                }
-            });
-    }
-
-    /**
-     * @param node Node originated request.
-     * @param msg Force keys message.
-     */
-    private void processForceKeysRequest0(ClusterNode node, 
GridDhtForceKeysRequest msg) {
-        if (!enterBusy())
-            return;
-
-        try {
-            ClusterNode loc = cctx.localNode();
-
-            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
-                cctx.cacheId(),
-                msg.futureId(),
-                msg.miniId(),
-                cctx.deploymentEnabled());
-
-            for (KeyCacheObject k : msg.keys()) {
-                int p = cctx.affinity().partition(k);
-
-                GridDhtLocalPartition locPart = top.localPartition(p, 
AffinityTopologyVersion.NONE, false);
-
-                // If this node is no longer an owner.
-                if (locPart == null && !top.owners(p).contains(loc)) {
-                    res.addMissed(k);
-
-                    continue;
-                }
-
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = cctx.dht().entryEx(k);
-
-                        entry.unswap();
-
-                        GridCacheEntryInfo info = entry.info();
-
-                        if (info == null) {
-                            assert entry.obsolete() : entry;
-
-                            continue;
-                        }
-
-                        if (!info.isNew())
-                            res.addInfo(info);
-
-                        cctx.evicts().touch(entry, msg.topologyVersion());
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry: " + k);
-                    }
-                    catch (GridDhtInvalidPartitionException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Local node is no longer an owner: " + 
p);
-
-                        res.addMissed(k);
-
-                        break;
-                    }
-                }
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Sending force key response [node=" + node.id() + ", 
res=" + res + ']');
-
-            cctx.io().send(node, res, cctx.ioPolicy());
-        }
-        catch (ClusterTopologyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("Received force key request form failed node (will 
ignore) [nodeId=" + node.id() +
-                    ", req=" + msg + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to reply to force key request [nodeId=" + 
node.id() + ", req=" + msg + ']', e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param node Node.
-     * @param msg Message.
-     */
-    private void processForceKeyResponse(ClusterNode node, 
GridDhtForceKeysResponse msg) {
-        if (!enterBusy())
-            return;
-
-        try {
-            GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
-
-            if (f != null)
-                f.onResult(msg);
-            else if (log.isDebugEnabled())
-                log.debug("Receive force key response for unknown future (is 
it duplicate?) [nodeId=" + node.id() +
-                    ", res=" + msg + ']');
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param node Node.
-     * @param req Request.
-     */
-    private void processAffinityAssignmentRequest(final ClusterNode node,
-        final GridDhtAffinityAssignmentRequest req) {
-        final AffinityTopologyVersion topVer = req.topologyVersion();
-
-        if (log.isDebugEnabled())
-            log.debug("Processing affinity assignment request [node=" + node + 
", req=" + req + ']');
-
-        cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-            @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                if (log.isDebugEnabled())
-                    log.debug("Affinity is ready for topology version, will 
send response [topVer=" + topVer +
-                        ", node=" + node + ']');
-
-                AffinityAssignment assignment = 
cctx.affinity().assignment(topVer);
-
-                GridDhtAffinityAssignmentResponse res = new 
GridDhtAffinityAssignmentResponse(
-                    req.futureId(),
-                    cctx.cacheId(),
-                    topVer,
-                    assignment.assignment());
-
-                if 
(cctx.affinity().affinityCache().centralizedAffinityFunction()) {
-                    assert assignment.idealAssignment() != null;
-
-                    res.idealAffinityAssignment(assignment.idealAssignment());
-                }
-
-                try {
-                    cctx.io().send(node, res, AFFINITY_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send affinity assignment response 
to remote node [node=" + node + ']', e);
-                }
-            }
-        });
-    }
-
-    /**
      * Resends partitions on partition evict within configured timeout.
      *
      * @param part Evicted partition.
@@ -693,11 +446,11 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         try {
             top.onEvicted(part, updateSeq);
 
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
-                cctx.events().addUnloadEvent(part.id());
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+                grp.addUnloadEvent(part.id());
 
             if (updateSeq)
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
         }
         finally {
             leaveBusy();
@@ -706,7 +459,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public boolean needForceKeys() {
-        if (cctx.rebalanceEnabled()) {
+        if (grp.rebalanceEnabled()) {
             IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
 
             if (rebalanceFut.isDone() && 
Boolean.TRUE.equals(rebalanceFut.result()))
@@ -717,12 +470,13 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> 
request(GridNearAtomicAbstractUpdateRequest req,
+    @Override public GridDhtFuture<Object> request(GridCacheContext cctx,
+        GridNearAtomicAbstractUpdateRequest req,
         AffinityTopologyVersion topVer) {
         if (!needForceKeys())
             return null;
 
-        return request0(req.keys(), topVer);
+        return request0(cctx, req.keys(), topVer);
     }
 
     /**
@@ -730,21 +484,27 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
      * @return Future for request.
      */
     @SuppressWarnings({"unchecked", "RedundantCast"})
-    @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> 
keys, AffinityTopologyVersion topVer) {
+    @Override public GridDhtFuture<Object> request(GridCacheContext cctx,
+        Collection<KeyCacheObject> keys,
+        AffinityTopologyVersion topVer) {
         if (!needForceKeys())
             return null;
 
-        return request0(keys, topVer);
+        return request0(cctx, keys, topVer);
     }
 
     /**
+     * @param cctx Cache context.
      * @param keys Keys to request.
      * @param topVer Topology version.
      * @return Future for request.
      */
     @SuppressWarnings({"unchecked", "RedundantCast"})
-    private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, 
AffinityTopologyVersion topVer) {
-        final GridDhtForceKeysFuture<?, ?> fut = new 
GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
+    private GridDhtFuture<Object> request0(GridCacheContext cctx, 
Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+        if (cctx.isNear())
+            cctx = cctx.near().dht().context();
+
+        final GridDhtForceKeysFuture<?, ?> fut = new 
GridDhtForceKeysFuture<>(cctx, topVer, keys);
 
         IgniteInternalFuture<?> topReadyFut = 
cctx.affinity().affinityReadyFuturex(topVer);
 
@@ -754,7 +514,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
             if (topReadyFut == null)
                 startFut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> 
syncFut) {
-                        cctx.kernalContext().closure().runLocalSafe(
+                        ctx.kernalContext().closure().runLocalSafe(
                             new GridPlainRunnable() {
                                 @Override public void run() {
                                     fut.init();
@@ -791,46 +551,19 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         demandLock.writeLock().lock();
 
         try {
-            cctx.deploy().unwind(cctx);
+            grp.unwindUndeploys();
         }
         finally {
             demandLock.writeLock().unlock();
         }
     }
 
-    /**
-     * Adds future to future map.
-     *
-     * @param fut Future to add.
-     * @return {@code False} if node cache is stopping and future was 
completed with error.
-     */
-    boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
-        forceKeyFuts.put(fut.futureId(), fut);
-
-        if (stopping) {
-            fut.onDone(stopError());
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * Removes future from future map.
-     *
-     * @param fut Future to remove.
-     */
-    void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) {
-        forceKeyFuts.remove(fut.futureId(), fut);
-    }
-
     /** {@inheritDoc} */
     @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
         partsToEvict.putIfAbsent(part.id(), part);
 
         if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 
1)) {
-            cctx.closures().callLocalSafe(new GPC<Boolean>() {
+            ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
                 @Override public Boolean call() {
                     boolean locked = true;
 
@@ -851,7 +584,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                                         partsToEvict.put(part.id(), part);
                                 }
                                 catch (Throwable ex) {
-                                    if (cctx.kernalContext().isStopping()) {
+                                    if (ctx.kernalContext().isStopping()) {
                                         LT.warn(log, ex, "Partition eviction 
failed (current node is stopping).",
                                             false,
                                             true);
@@ -886,44 +619,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void dumpDebugInfo() {
-        if (!forceKeyFuts.isEmpty()) {
-            U.warn(log, "Pending force key futures [cache=" + cctx.name() + 
"]:");
-
-            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
-                U.warn(log, ">>> " + fut);
-        }
-
         supplier.dumpDebugInfo();
     }
-
-    /**
-     *
-     */
-    private abstract class MessageHandler<M> implements 
IgniteBiInClosure<UUID, M> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void apply(UUID nodeId, M msg) {
-            ClusterNode node = cctx.node(nodeId);
-
-            if (node == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Received message from failed node [node=" + 
nodeId + ", msg=" + msg + ']');
-
-                return;
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Received message from node [node=" + nodeId + ", 
msg=" + msg + ']');
-
-            onMessage(node, msg);
-        }
-
-        /**
-         * @param node Node.
-         * @param msg Message.
-         */
-        protected abstract void onMessage(ClusterNode node, M msg);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2d5c8a5..e6c5c10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -103,7 +103,7 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processGetResponse(nodeId, res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index f8240f3..5b53935 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -86,11 +86,23 @@ public abstract class GridNearCacheAdapter<K, V> extends 
GridDistributedCacheAda
      * @param ctx Context.
      */
     protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) {
-        super(ctx, ctx.config().getNearConfiguration().getNearStartSize());
+        super(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
+    @Override public void start() throws IgniteCheckedException {
+        if (map == null) {
+            map = new GridCacheLocalConcurrentMap(
+                ctx,
+                entryFactory(),
+                ctx.config().getNearConfiguration().getNearStartSize());
+        }
+    }
+
+    /**
+     * @return Entry factory.
+     */
+    private GridCacheMapEntryFactory entryFactory() {
         return new GridCacheMapEntryFactory() {
             @Override public GridCacheMapEntry create(
                 GridCacheContext ctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 757bfbd..6092511 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -50,7 +50,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Get request.
  */
-public class GridNearGetRequest extends GridCacheMessage implements 
GridCacheDeployable,
+public class GridNearGetRequest extends GridCacheIdMessage implements 
GridCacheDeployable,
     GridCacheVersionable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 50af754..b4e4424 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -28,7 +28,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
@@ -45,7 +45,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Get response.
  */
-public class GridNearGetResponse extends GridCacheMessage implements 
GridCacheDeployable,
+public class GridNearGetResponse extends GridCacheIdMessage implements 
GridCacheDeployable,
     GridCacheVersionable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index f4ce1ac..edddf7d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
@@ -159,12 +160,20 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
 
             if (e instanceof IgniteTxRollbackCheckedException) {
                 if (marked) {
-                    try {
-                        tx.rollback();
-                    }
-                    catch (IgniteCheckedException ex) {
-                        U.error(log, "Failed to automatically rollback 
transaction: " + tx, ex);
-                    }
+                    tx.rollbackAsync().listen(new 
IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                        @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                            try {
+                                fut.get();
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(log, "Failed to automatically rollback 
transaction: " + tx, e);
+                            }
+
+                            onComplete();
+                        }
+                    });
+
+                    return;
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 0faa8ec..00ff4bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -23,7 +23,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  *
  */
-public class GridNearSingleGetRequest extends GridCacheMessage implements 
GridCacheDeployable {
+public class GridNearSingleGetRequest extends GridCacheIdMessage implements 
GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index 554fca1..2cb75c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -25,7 +25,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -37,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public class GridNearSingleGetResponse extends GridCacheMessage implements 
GridCacheDeployable {
+public class GridNearSingleGetResponse extends GridCacheIdMessage implements 
GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -115,7 +115,7 @@ public class GridNearSingleGetResponse extends 
GridCacheMessage implements GridC
      * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
+        return topVer != null ? topVer : super.topologyVersion();
     }
 
     /**

Reply via email to