Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 26a1bb6a5 -> e6ebae167


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6ebae16
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6ebae16
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6ebae16

Branch: refs/heads/ignite-5075
Commit: e6ebae167b2e0f50b746830d84cc466b4a957488
Parents: 26a1bb6
Author: sboikov <[email protected]>
Authored: Thu May 4 17:42:52 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu May 4 18:41:14 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  82 +++++-------
 .../cache/CacheGroupInfrastructure.java         | 131 +++++++++++++++++--
 .../processors/cache/ClusterCachesInfo.java     |   4 +
 .../processors/cache/GridCacheContext.java      |   4 +
 .../processors/cache/GridCacheProcessor.java    |  25 ++--
 .../processors/cache/GridCacheUtils.java        |   4 +-
 .../cache/affinity/GridCacheAffinityImpl.java   |   2 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  30 ++---
 .../dht/GridDhtPartitionTopology.java           |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 107 ++++++++-------
 .../dht/preloader/GridDhtPreloader.java         |  50 -------
 .../continuous/CacheContinuousQueryHandler.java |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |   4 +-
 13 files changed, 250 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index adaa1e1..bd80bf0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -288,7 +288,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         Map<Integer, Map<Integer, List<UUID>>> assignmentsChange = 
U.newHashMap(waitInfo.assignments.size());
 
         for (Map.Entry<Integer, Map<Integer, List<ClusterNode>>> e : 
waitInfo.assignments.entrySet()) {
-            Integer cacheId = e.getKey();
+            Integer grpId = e.getKey();
 
             Map<Integer, List<ClusterNode>> assignment = e.getValue();
 
@@ -297,27 +297,26 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             for (Map.Entry<Integer, List<ClusterNode>> e0 : 
assignment.entrySet())
                 assignment0.put(e0.getKey(), toIds0(e0.getValue()));
 
-            assignmentsChange.put(cacheId, assignment0);
+            assignmentsChange.put(grpId, assignment0);
         }
 
         return new CacheAffinityChangeMessage(waitInfo.topVer, 
assignmentsChange, waitInfo.deploymentIds);
     }
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    public void onCacheCreated(GridCacheContext cctx) {
-        final Integer cacheId = cctx.cacheId();
-
-        // TODO IGNITE-5075: move to group initialization?
-//        if (!caches.containsKey(cctx.cacheId())) {
-//            cctx.io().addHandler(cacheId, 
GridDhtAffinityAssignmentResponse.class,
-//                new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
-//                    @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
-//                        processAffinityAssignmentResponse(cacheId, nodeId, 
res);
-//                    }
-//                });
-//        }
+    void onCacheGroupCreated(CacheGroupInfrastructure grp) {
+        final Integer grpId = grp.groupId();
+
+        if (!grpHolders.containsKey(grp.groupId())) {
+            cctx.io().addHandler(grpId, 
GridDhtAffinityAssignmentResponse.class,
+                new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
+                    @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
+                        processAffinityAssignmentResponse(grpId, nodeId, res);
+                    }
+                });
+        }
     }
 
     /**
@@ -380,7 +379,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 nearCfg = req.nearCacheConfiguration();
             }
             else {
-                startCache = cctx.cacheContext(action.descriptor().cacheId()) 
== null &&
+                startCache = cctx.cacheContext(cacheDesc.cacheId()) == null &&
                     CU.affinityNode(cctx.localNode(), 
req.startCacheConfiguration().getNodeFilter());
             }
 
@@ -694,16 +693,16 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processAffinityAssignmentResponse(Integer cacheId, UUID 
nodeId, GridDhtAffinityAssignmentResponse res) {
+    private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + 
nodeId + ", res=" + res + ']');
 
         for (GridDhtAssignmentFetchFuture fut : 
pendingAssignmentFetchFuts.values()) {
-            if (fut.key().get1().equals(cacheId)) {
+            if (fut.key().get1().equals(grpId)) {
                 fut.onResponse(nodeId, res);
 
                 break;
@@ -992,9 +991,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         for (int i = 0; i < fetchFuts.size(); i++) {
             GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
 
-            Integer cacheId = fetchFut.key().get1();
+            Integer grpId = fetchFut.key().get1();
 
-            fetchAffinity(fut, 
cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
+            fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), 
fetchFut);
         }
     }
 
@@ -1053,11 +1052,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         boolean centralizedAff;
 
         if (lateAffAssign) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                
cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), 
fut.discoveryEvent(), fut.discoCache());
+                grp.affinity().calculate(fut.topologyVersion(), 
fut.discoveryEvent(), fut.discoCache());
             }
 
             centralizedAff = true;
@@ -1242,13 +1241,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
         if (!crd) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                boolean latePrimary = cacheCtx.rebalanceEnabled();
+                boolean latePrimary = grp.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, 
cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
+                initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, 
affCache);
             }
 
             return null;
@@ -1626,17 +1625,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
          */
         abstract boolean client();
 
+        /**
+         * @return Group ID.
+         */
         int groupId() {
             return aff.groupId();
         }
 
-//        /**
-//         * @return Cache ID.
-//         */
-//        int cacheId() {
-//            return aff.cacheId();
-//        }
-
         /**
          * @return Partitions number.
          */
@@ -1644,13 +1639,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             return aff.partitions();
         }
 
-//        /**
-//         * @return Cache name.
-//         */
-//        String name() {
-//            return aff.cacheName();
-//        }
-
         /**
          * @param fut Exchange future.
          * @return Cache topology.
@@ -1689,16 +1677,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             return false;
         }
 
-//        /** {@inheritDoc} */
-//        @Override public String name() {
-//            return cctx.name();
-//        }
-//
-//        /** {@inheritDoc} */
-//        @Override public int cacheId() {
-//            return cctx.cacheId();
-//        }
-
         /** {@inheritDoc} */
         @Override public GridDhtPartitionTopology 
topology(GridDhtPartitionsExchangeFuture fut) {
             return grp.topology();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 72a62ce..57e560f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -17,17 +17,29 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 
 /**
  *
@@ -37,7 +49,7 @@ public class CacheGroupInfrastructure {
     private GridAffinityAssignmentCache aff;
 
     /** */
-    private final int id;
+    private final int grpId;
 
     /** */
     private final CacheConfiguration ccfg;
@@ -46,24 +58,37 @@ public class CacheGroupInfrastructure {
     private final GridCacheSharedContext ctx;
 
     /** */
-    private GridDhtPartitionTopology top;
+    private final IgniteLogger log;
 
+    /** */
+    private GridDhtPartitionTopologyImpl top;
+
+    /** */
     private AffinityTopologyVersion grpStartVer;
 
+    /** */
     private AffinityTopologyVersion locStartVer;
 
     /**
-     * @param id Group ID.
+     * @param grpId Group ID.
      * @param ctx Context.
      * @param ccfg Cache configuration.
      */
-    CacheGroupInfrastructure(int id, GridCacheSharedContext ctx, 
CacheConfiguration ccfg) {
-        assert id != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", 
grpName=" + ccfg.getGroupName() + ']';
+    CacheGroupInfrastructure(GridCacheSharedContext ctx,
+        int grpId,
+        CacheConfiguration ccfg,
+        AffinityTopologyVersion grpStartVer,
+        AffinityTopologyVersion locStartVer) {
+        assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", 
grpName=" + ccfg.getGroupName() + ']';
         assert ccfg != null;
 
-        this.id = id;
+        this.grpId = grpId;
         this.ctx = ctx;
         this.ccfg = ccfg;
+        this.grpStartVer = grpStartVer;
+        this.locStartVer = locStartVer;
+
+        log = ctx.kernalContext().log(getClass());
     }
 
     public AffinityTopologyVersion groupStartVersion() {
@@ -89,12 +114,12 @@ public class CacheGroupInfrastructure {
         return aff;
     }
 
-    @Nullable public String groupName() {
+    @Nullable public String name() {
         return ccfg.getGroupName();
     }
 
     public int groupId() {
-        return id;
+        return grpId;
     }
 
     public boolean sharedGroup() {
@@ -103,12 +128,92 @@ public class CacheGroupInfrastructure {
 
     public void start() throws IgniteCheckedException {
         aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
-            groupName(),
-            id,
+            name(),
+            grpId,
             ccfg.getAffinity(),
             ccfg.getNodeFilter(),
             ccfg.getBackups(),
             ccfg.getCacheMode() == LOCAL);
+
+        if (ccfg.getCacheMode() != LOCAL) {
+            GridCacheMapEntryFactory entryFactory = new 
GridCacheMapEntryFactory() {
+                @Override public GridCacheMapEntry create(
+                    GridCacheContext ctx,
+                    AffinityTopologyVersion topVer,
+                    KeyCacheObject key,
+                    int hash,
+                    CacheObject val
+                ) {
+                    return new GridDhtCacheEntry(ctx, topVer, key, hash, val);
+                }
+            };
+
+            top = new GridDhtPartitionTopologyImpl(ctx, entryFactory);
+
+            if (!ctx.kernalContext().clientNode()) {
+                ctx.io().addHandler(groupId(), 
GridDhtAffinityAssignmentRequest.class,
+                    new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentRequest>() {
+                        @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentRequest msg) {
+                            processAffinityAssignmentRequest(nodeId, msg);
+                        }
+                    });
+            }
+        }
+
+        ctx.affinity().onCacheGroupCreated(this);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processAffinityAssignmentRequest(final UUID nodeId,
+        final GridDhtAffinityAssignmentRequest req) {
+        if (log.isDebugEnabled())
+            log.debug("Processing affinity assignment request [node=" + nodeId 
+ ", req=" + req + ']');
+
+        IgniteInternalFuture<AffinityTopologyVersion> fut = 
aff.readyFuture(req.topologyVersion());
+
+        if (fut != null) {
+            fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    processAffinityAssignmentRequest0(nodeId, req);
+                }
+            });
+        }
+        else
+            processAffinityAssignmentRequest0(nodeId, req);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    private void processAffinityAssignmentRequest0(UUID nodeId, final 
GridDhtAffinityAssignmentRequest req) {
+        AffinityTopologyVersion topVer = req.topologyVersion();
+
+        if (log.isDebugEnabled())
+            log.debug("Affinity is ready for topology version, will send 
response [topVer=" + topVer +
+                ", node=" + nodeId + ']');
+
+        AffinityAssignment assignment = aff.cachedAffinity(topVer);
+
+        GridDhtAffinityAssignmentResponse res = new 
GridDhtAffinityAssignmentResponse(grpId,
+            topVer,
+            assignment.assignment());
+
+        if (aff.centralizedAffinityFunction()) {
+            assert assignment.idealAssignment() != null;
+
+            res.idealAffinityAssignment(assignment.idealAssignment());
+        }
+
+        try {
+            ctx.io().send(nodeId, res, AFFINITY_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send affinity assignment response to 
remote node [node=" + nodeId + ']', e);
+        }
     }
 
     /**
@@ -136,9 +241,15 @@ public class CacheGroupInfrastructure {
     public void onReconnected() {
         // TODO IGNITE-5075.
         aff.onReconnected();
+
+        if (top != null)
+            top.onReconnected();
     }
 
     public GridDhtPartitionTopology topology() {
+        if (top == null)
+            throw new IllegalStateException("Topology is not initialized: " + 
groupName());
+
         return top;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 060c933..f70ea8d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -927,6 +927,10 @@ class ClusterCachesInfo {
             topVer,
             caches);
 
+        CacheGroupDescriptor old = registeredCacheGrps.put(grpName, grpDesc);
+
+        assert old == null : old;
+
         ctx.discovery().addCacheGroup(grpDesc, 
startedCacheCfg.getNodeFilter(), startedCacheCfg.getCacheMode());
 
         if (exchActions != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index bfd28cf..7a4ad33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -393,6 +393,10 @@ public class GridCacheContext<K, V> implements 
Externalizable {
         itHolder = new CacheWeakQueryIteratorsHolder(log);
     }
 
+    public int groupId() {
+        return grp.groupId();
+    }
+
     /**
      * @return Cache group infrastructure.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1745d16..3769274 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1725,10 +1725,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         AffinityTopologyVersion exchTopVer)
         throws IgniteCheckedException {
         prepareCacheStart(
+            cacheDesc.groupDescriptor(),
             cacheDesc.cacheConfiguration(),
             nearCfg,
             cacheDesc.cacheType(),
-            cacheDesc.groupDescriptor().groupId(),
             cacheDesc.deploymentId(),
             cacheDesc.startTopologyVersion(),
             exchTopVer,
@@ -1748,10 +1748,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 DynamicCacheDescriptor desc = t.get1();
 
                 prepareCacheStart(
+                    desc.groupDescriptor(),
                     desc.cacheConfiguration(),
                     t.get2(),
                     desc.cacheType(),
-                    desc.groupDescriptor().groupId(),
                     desc.deploymentId(),
                     desc.startTopologyVersion(),
                     exchTopVer,
@@ -1779,10 +1779,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                 if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
                     prepareCacheStart(
+                        desc.groupDescriptor(),
                         desc.cacheConfiguration(),
                         null,
                         desc.cacheType(),
-                        desc.groupDescriptor().groupId(),
                         desc.deploymentId(),
                         desc.startTopologyVersion(),
                         exchTopVer,
@@ -1806,10 +1806,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     private void prepareCacheStart(
+        CacheGroupDescriptor grpDesc,
         CacheConfiguration startCfg,
         @Nullable NearCacheConfiguration reqNearCfg,
         CacheType cacheType,
-        int grpId,
         IgniteUuid deploymentId,
         AffinityTopologyVersion cacheStartTopVer,
         AffinityTopologyVersion exchTopVer,
@@ -1831,10 +1831,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             }
 
             if (grp == null)
-                grp = startCacheGroup(startCfg, grpId);
+                grp = startCacheGroup(grpDesc, exchTopVer);
         }
         else
-            grp = startCacheGroup(startCfg, grpId);
+            grp = startCacheGroup(grpDesc, exchTopVer);
 
         CacheConfiguration ccfg = new CacheConfiguration(startCfg);
 
@@ -1878,14 +1878,19 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         onKernalStart(cache);
     }
 
-    private CacheGroupInfrastructure startCacheGroup(CacheConfiguration cfg0, 
int grpId) throws IgniteCheckedException {
-        CacheConfiguration ccfg = new CacheConfiguration(cfg0);
+    private CacheGroupInfrastructure startCacheGroup(CacheGroupDescriptor 
desc, AffinityTopologyVersion exchTopVer)
+        throws IgniteCheckedException {
+        CacheConfiguration ccfg = new CacheConfiguration(desc.config());
 
-        CacheGroupInfrastructure grp = new CacheGroupInfrastructure(grpId, 
sharedCtx, ccfg);
+        CacheGroupInfrastructure grp = new CacheGroupInfrastructure(sharedCtx,
+            desc.groupId(),
+            ccfg,
+            desc.startTopologyVersion(),
+            exchTopVer);
 
         grp.start();
 
-        CacheGroupInfrastructure old = cacheGrps.put(grpId, grp);
+        CacheGroupInfrastructure old = cacheGrps.put(desc.groupId(), grp);
 
         assert old == null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 2260a99..f695768 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -516,7 +516,7 @@ public class GridCacheUtils {
      * @return All nodes on which cache with the same name is started.
      */
     public static Collection<ClusterNode> affinityNodes(final GridCacheContext 
ctx) {
-        return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), 
AffinityTopologyVersion.NONE);
+        return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), 
AffinityTopologyVersion.NONE);
     }
 
     /**
@@ -527,7 +527,7 @@ public class GridCacheUtils {
      * @return Affinity nodes.
      */
     public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, 
AffinityTopologyVersion topOrder) {
-        return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), topOrder);
+        return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), 
topOrder);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 41b3281..f6032fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -196,7 +196,7 @@ public class GridCacheAffinityImpl<K, V> implements 
Affinity<K> {
         int nodesCnt;
 
         if (!cctx.isLocal())
-            nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.cacheId(), 
topVer).size();
+            nodesCnt = 
cctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer).size();
         else
             nodesCnt = 1;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 7e6ae81..2ee6f83 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -96,9 +96,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Topology. */
-    private GridDhtPartitionTopologyImpl top;
-
     /** Preloader. */
     protected GridCachePreloader preldr;
 
@@ -174,13 +171,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override protected void init() {
-        super.init();
-
-        top = new GridDhtPartitionTopologyImpl(ctx, entryFactory());
-    }
-
-    /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
@@ -200,7 +190,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
 
         // Clean up to help GC.
         preldr = null;
-        top = null;
     }
 
     /** {@inheritDoc} */
@@ -209,7 +198,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
 
         ctx.affinity().onReconnected();
 
-        top.onReconnected();
+        // TODO IGNITE-5075.
+        //top.onReconnected();
 
         if (preldr != null)
             preldr.onReconnected();
@@ -235,7 +225,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     @Override public void printMemoryStats() {
         super.printMemoryStats();
 
-        top.printMemoryStats(1024);
+        ctx.group().topology().printMemoryStats(1024);
     }
 
     /**
@@ -264,7 +254,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @return Partition topology.
      */
     public GridDhtPartitionTopology topology() {
-        return top;
+        return ctx.group().topology();
     }
 
     /** {@inheritDoc} */
@@ -302,6 +292,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (tup != null)
             throw new IgniteCheckedException("Nested multi-update locks are 
not supported");
 
+        GridDhtPartitionTopology top = ctx.group().topology();
+
         top.readLock();
 
         GridDhtTopologyFuture topFut;
@@ -344,7 +336,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (tup == null)
             throw new IgniteCheckedException("Multi-update was not started or 
released twice.");
 
-        top.readLock();
+        ctx.group().topology().readLock();
 
         try {
             IgniteUuid lockId = tup.get1();
@@ -357,7 +349,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
             multiFut.onDone(lockId);
         }
         finally {
-            top.readUnlock();
+            ctx.group().topology().readUnlock();
         }
     }
 
@@ -518,7 +510,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
             return;
 
         try {
-            GridDhtLocalPartition part = 
top.localPartition(ctx.affinity().partition(key),
+            GridDhtLocalPartition part = 
ctx.group().topology().localPartition(ctx.affinity().partition(key),
                 AffinityTopologyVersion.NONE, true);
 
             // Reserve to make sure that partition does not get unloaded.
@@ -1201,8 +1193,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         if (expVer.equals(curVer))
             return false;
 
-        Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
-        Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
+        Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
+        Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 
         if (!cacheNodes0.equals(cacheNodes1) || 
ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
             return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index f9fd852..cf12986 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -90,7 +90,7 @@ public interface GridDhtPartitionTopology {
     /**
      * @return Cache ID.
      */
-    public int cacheId();
+    public int groupId();
 
     /**
      * Pre-initializes this topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6d45d6e..6634e98 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -42,9 +42,11 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -72,7 +74,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  * Partition topology.
  */
 @GridToStringExclude
-class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
+public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -82,8 +84,11 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     /** */
     private static final Long ZERO = 0L;
 
-    /** Context. */
-    private final GridCacheContext<?, ?> cctx;
+    /** */
+    private final GridCacheSharedContext ctx;
+
+    /** */
+    private final CacheGroupInfrastructure grp;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -131,23 +136,26 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     private volatile boolean treatAllPartAsLoc;
 
     /**
-     * @param cctx Context.
+     * @param ctx Context.
      * @param entryFactory Entry factory.
      */
-    GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, 
GridCacheMapEntryFactory entryFactory) {
-        assert cctx != null;
+    public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx, 
CacheGroupInfrastructure grp, GridCacheMapEntryFactory entryFactory) {
+        assert ctx != null;
+        assert grp != null;
+        assert entryFactory != null;
 
-        this.cctx = cctx;
+        this.ctx = ctx;
+        this.grp = grp;
         this.entryFactory = entryFactory;
 
-        log = cctx.logger(getClass());
+        log = ctx.logger(getClass());
 
-        locParts = new 
AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
+        locParts = new 
AtomicReferenceArray<>(grp.config().getAffinity().partitions());
     }
 
     /** {@inheritDoc} */
-    @Override public int cacheId() {
-        return cctx.cacheId();
+    @Override public int groupId() {
+        return grp.groupId();
     }
 
     /**
@@ -171,7 +179,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             topVer = AffinityTopologyVersion.NONE;
 
-            discoCache = cctx.discovery().discoCache();
+            discoCache = ctx.discovery().discoCache();
         }
         finally {
             lock.writeLock().unlock();
@@ -235,13 +243,13 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                             if (dumpCnt++ < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
                                 U.warn(log, "Failed to wait for partition 
eviction [" +
                                     "topVer=" + topVer +
-                                    ", cache=" + cctx.name() +
+                                    ", group=" + grp.name() +
                                     ", part=" + part.id() +
                                     ", partState=" + part.state() +
                                     ", size=" + part.size() +
                                     ", reservations=" + part.reservations() +
                                     ", grpReservations=" + 
part.groupReserved() +
-                                    ", node=" + cctx.localNodeId() + "]");
+                                    ", node=" + ctx.localNodeId() + "]");
 
                                 if 
(IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, 
false))
                                     U.dumpThreads(log);
@@ -329,7 +337,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         AffinityTopologyVersion topVer = this.topVer;
 
         assert topVer.topologyVersion() > 0 : "Invalid topology version 
[topVer=" + topVer +
-            ", cacheName=" + cctx.name() + ']';
+            ", group=" + grp.name() + ']';
 
         return topVer;
     }
@@ -371,7 +379,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      */
     private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long 
updateSeq) {
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
 
@@ -379,21 +387,21 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
         assert topVer.equals(exchFut.topologyVersion()) :
             "Invalid topology [topVer=" + topVer +
-                ", cache=" + cctx.name() +
+                ", grp=" + grp.name() +
                 ", futVer=" + exchFut.topologyVersion() +
                 ", fut=" + exchFut + ']';
-        assert 
cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) :
-            "Invalid affinity [topVer=" + 
cctx.affinity().affinityTopologyVersion() +
-                ", cache=" + cctx.name() +
+        assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) :
+            "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
+                ", grp=" + grp.name() +
                 ", futVer=" + exchFut.topologyVersion() +
                 ", fut=" + exchFut + ']';
 
-        List<List<ClusterNode>> aff = 
cctx.affinity().assignments(exchFut.topologyVersion());
+        List<List<ClusterNode>> aff = 
grp.affinity().assignments(exchFut.topologyVersion());
 
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
-        if (cctx.rebalanceEnabled()) {
-            boolean added = 
exchId.topologyVersion().equals(cctx.cacheStartTopologyVersion());
+        if (grp.rebalanceEnabled()) {
+            boolean added = 
exchId.topologyVersion().equals(grp.groupStartVersion());
 
             boolean first = (loc.equals(oldest) && 
loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
 
@@ -406,7 +414,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
                         boolean owned = locPart.own();
 
-                        assert owned : "Failed to own partition for oldest 
node [cacheName" + cctx.name() +
+                        assert owned : "Failed to own partition for oldest 
node [grp" + grp.name() +
                             ", part=" + locPart + ']';
 
                         if (log.isDebugEnabled())
@@ -465,7 +473,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      */
     private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) 
{
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
         for (int p = 0; p < num; p++) {
             if (node2part != null && node2part.valid()) {
@@ -493,20 +501,20 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         ClusterState newState = exchFut.newClusterState();
 
         treatAllPartAsLoc = (newState != null && newState == 
ClusterState.ACTIVE)
-            || (cctx.kernalContext().state().active()
+            || (ctx.kernalContext().state().active()
             && discoEvt.type() == EventType.EVT_NODE_JOINED
             && discoEvt.eventNode().isLocal()
-            && !cctx.kernalContext().clientNode()
+            && !ctx.kernalContext().clientNode()
         );
 
         // Wait for rent outside of checkpoint lock.
         waitForRent();
 
-        ClusterNode loc = cctx.localNode();
+        ClusterNode loc = ctx.localNode();
 
-        cctx.shared().database().checkpointReadLock();
+        ctx.database().checkpointReadLock();
 
-        synchronized (cctx.shared().exchange().interruptLock()) {
+        synchronized (ctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 throw new IgniteInterruptedCheckedException("Thread is 
interrupted: " + Thread.currentThread());
 
@@ -514,7 +522,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 U.writeLock(lock);
             }
             catch (IgniteInterruptedCheckedException e) {
-                cctx.shared().database().checkpointReadUnlock();
+                ctx.database().checkpointReadUnlock();
 
                 throw e;
             }
@@ -568,7 +576,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 if (affReady)
                     initPartitions0(exchFut, updateSeq);
                 else {
-                    List<List<ClusterNode>> aff = 
cctx.affinity().idealAssignment();
+                    List<List<ClusterNode>> aff = 
grp.affinity().idealAssignment();
 
                     createPartitions(aff, updateSeq);
                 }
@@ -582,7 +590,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             finally {
                 lock.writeLock().unlock();
 
-                cctx.shared().database().checkpointReadUnlock();
+                ctx.database().checkpointReadUnlock();
             }
         }
 
@@ -596,13 +604,13 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
         boolean changed = waitForRent();
 
-        int num = cctx.affinity().partitions();
+        int num = grp.affinity().partitions();
 
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
-        assert cctx.affinity().affinityTopologyVersion().equals(topVer) : 
"Affinity is not initialized " +
+        assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not 
initialized " +
             "[topVer=" + topVer +
-            ", affVer=" + cctx.affinity().affinityTopologyVersion() +
+            ", affVer=" + grp.affinity().lastVersion() +
             ", fut=" + exchFut + ']';
 
         lock.writeLock().lock();
@@ -623,7 +631,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, 
false, false);
 
-                if (cctx.affinity().partitionLocalNode(p, topVer)) {
+                if (grp.affinity().partitionLocalNode(p, topVer)) {
                     // This partition will be created during next topology 
event,
                     // which obviously has not happened at this point.
                     if (locPart == null) {
@@ -636,27 +644,28 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     GridDhtPartitionState state = locPart.state();
 
                     if (state == MOVING) {
-                        if (cctx.rebalanceEnabled()) {
+                        if (grp.rebalanceEnabled()) {
                             Collection<ClusterNode> owners = owners(p);
 
                             // If there are no other owners, then become an 
owner.
                             if (F.isEmpty(owners)) {
                                 boolean owned = locPart.own();
 
-                                assert owned : "Failed to own partition 
[cacheName" + cctx.name() + ", locPart=" +
+                                assert owned : "Failed to own partition [grp=" 
+ grp.name() + ", locPart=" +
                                     locPart + ']';
 
                                 updateSeq = updateLocal(p, locPart.state(), 
updateSeq);
 
                                 changed = true;
 
-                                if 
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                                    DiscoveryEvent discoEvt = 
exchFut.discoveryEvent();
-
-                                    cctx.events().addPreloadEvent(p,
-                                        EVT_CACHE_REBALANCE_PART_DATA_LOST, 
discoEvt.eventNode(),
-                                        discoEvt.type(), discoEvt.timestamp());
-                                }
+// TODO IGNITE-5075.
+//                                if 
(ctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+//                                    DiscoveryEvent discoEvt = 
exchFut.discoveryEvent();
+//
+//                                    cctx.events().addPreloadEvent(p,
+//                                        EVT_CACHE_REBALANCE_PART_DATA_LOST, 
discoEvt.eventNode(),
+//                                        discoEvt.type(), 
discoEvt.timestamp());
+//                                }
 
                                 if (log.isDebugEnabled())
                                     log.debug("Owned partition: " + locPart);
@@ -673,7 +682,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     if (locPart != null) {
                         GridDhtPartitionState state = locPart.state();
 
-                        if (state == MOVING && 
cctx.kernalContext().state().active()) {
+                        if (state == MOVING && 
ctx.kernalContext().state().active()) {
                             locPart.rent(false);
 
                             updateSeq = updateLocal(p, locPart.state(), 
updateSeq);
@@ -687,7 +696,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 }
             }
 
-            updateRebalanceVersion(cctx.affinity().assignments(topVer));
+            updateRebalanceVersion(grp.affinity().assignments(topVer));
 
             consistencyCheck();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 43e6af7..e197864 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -177,17 +177,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                 }
             });
 
-        if (!cctx.kernalContext().clientNode()) {
-            cctx.io().addHandler(cctx.cacheId(), 
GridDhtAffinityAssignmentRequest.class,
-                new MessageHandler<GridDhtAffinityAssignmentRequest>() {
-                    @Override protected void onMessage(ClusterNode node, 
GridDhtAffinityAssignmentRequest msg) {
-                        processAffinityAssignmentRequest(node, msg);
-                    }
-                });
-        }
-
-        cctx.shared().affinity().onCacheCreated(cctx);
-
         supplier = new GridDhtPartitionSupplier(cctx);
         demander = new GridDhtPartitionDemander(cctx);
 
@@ -590,45 +579,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /**
-     * @param node Node.
-     * @param req Request.
-     */
-    private void processAffinityAssignmentRequest(final ClusterNode node,
-        final GridDhtAffinityAssignmentRequest req) {
-        final AffinityTopologyVersion topVer = req.topologyVersion();
-
-        if (log.isDebugEnabled())
-            log.debug("Processing affinity assignment request [node=" + node + 
", req=" + req + ']');
-
-        cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-            @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                if (log.isDebugEnabled())
-                    log.debug("Affinity is ready for topology version, will 
send response [topVer=" + topVer +
-                        ", node=" + node + ']');
-
-                AffinityAssignment assignment = 
cctx.affinity().assignment(topVer);
-
-                GridDhtAffinityAssignmentResponse res = new 
GridDhtAffinityAssignmentResponse(cctx.cacheId(),
-                    topVer,
-                    assignment.assignment());
-
-                if 
(cctx.affinity().affinityCache().centralizedAffinityFunction()) {
-                    assert assignment.idealAssignment() != null;
-
-                    res.idealAffinityAssignment(assignment.idealAssignment());
-                }
-
-                try {
-                    cctx.io().send(node, res, AFFINITY_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send affinity assignment response 
to remote node [node=" + node + ']', e);
-                }
-            }
-        });
-    }
-
-    /**
      * Resends partitions on partition evict within configured timeout.
      *
      * @param part Evicted partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..6b6585b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1327,7 +1327,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                         t.get1());
 
                     for (AffinityTopologyVersion topVer : t.get2()) {
-                        for (ClusterNode node : 
ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
+                        for (ClusterNode node : 
ctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer)) {
                             if (!node.isLocal()) {
                                 try {
                                     cctx.io().send(node, msg, 
GridIoPolicy.SYSTEM_POOL);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ebae16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a591517..c6dc114 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -506,8 +506,8 @@ public class IgniteTxHandler {
         for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
             GridCacheContext ctx = e.context();
 
-            Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
-            Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
+            Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
+            Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 
             if (!cacheNodes0.equals(cacheNodes1) || 
ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
                 return true;

Reply via email to